Repository: accumulo Updated Branches: refs/heads/master 6cbe886e3 -> 8c2294df8
ACCUMULO-3420 Metrics Gathering Object Added Signed-off-by: Christopher Tubbs <ctubb...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8c2294df Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8c2294df Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8c2294df Branch: refs/heads/master Commit: 8c2294df8a3e602f1571861196ff92fddeecf350 Parents: 6cbe886 Author: Jenna Huston <jenna.husto...@gmail.com> Authored: Wed Dec 17 11:47:51 2014 -0500 Committer: Christopher Tubbs <ctubb...@apache.org> Committed: Thu Feb 5 12:23:05 2015 -0500 ---------------------------------------------------------------------- .../core/file/rfile/MetricsGatherer.java | 87 ++++ .../accumulo/core/file/rfile/PrintInfo.java | 51 +- .../apache/accumulo/core/file/rfile/RFile.java | 50 +- .../core/file/rfile/VisMetricsGatherer.java | 172 +++++++ .../core/file/rfile/VisibilityMetric.java | 73 +++ .../core/file/rfile/RFileMetricsTest.java | 515 +++++++++++++++++++ .../accumulo/core/file/rfile/RFileTest.java | 30 +- 7 files changed, 953 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/8c2294df/core/src/main/java/org/apache/accumulo/core/file/rfile/MetricsGatherer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/MetricsGatherer.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/MetricsGatherer.java new file mode 100644 index 0000000..bfda9aa --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/MetricsGatherer.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.rfile; + +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Map; + +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.io.Text; + +/** + * Interface used to gather metrics from RFiles. + * + * @param <T> + * Type used to return metrics in getMetrics(). This does not impact collection of metrics at all, is only used in that method. + */ +public interface MetricsGatherer<T> { + + /** + * Initialize the gatherer when it is registered with the RFile Reader + * + * @param cf + * Map of the LocalityGroup names to their column families + */ + void init(Map<String,ArrayList<ByteSequence>> cf); + + /** + * Start a new LocalityGroup. This method is used when the RFile seeks to the next LocalityGroup. + * + * @param cf + * Text object of the column family of the first entry in the locality group + */ + void startLocalityGroup(Text cf); + + /** + * Collect and store metrics for the given entry. + * + * @param key + * Key object of the entry you are collecting metrics from + * + * @param val + * Value object of the entry you are collecting metrics from + * + */ + void addMetric(Key key, Value val); + + /** + * Start a new block within a LocalityGroup. This method is used when the RFile moves on the the next block in the LocalityGroup. + */ + void startBlock(); + + /** + * Print the results of the metrics gathering by locality group in the format: Metric name Number of keys Percentage of keys Number of blocks Percentage of + * blocks + * + * @param hash + * Boolean to determine whether the values being printed should be hashed + * @param metricWord + * String of the name of the metric that was collected + * @param out + * PrintStream of where the information should be written to + */ + void printMetrics(boolean hash, String metricWord, PrintStream out); + + /** + * @return the metrics gathered + */ + T getMetrics(); + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/8c2294df/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java index 9ff1dd2..591d477 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java @@ -18,6 +18,8 @@ package org.apache.accumulo.core.file.rfile; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import org.apache.accumulo.core.cli.Help; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -47,6 +49,10 @@ public class PrintInfo implements KeywordExecutable { static class Opts extends Help { @Parameter(names = {"-d", "--dump"}, description = "dump the key/value pairs") boolean dump = false; + @Parameter(names = {"-v", "--vis"}, description = "show visibility metrics") + boolean vis = false; + @Parameter(names = {"--visHash"}, description = "show visibilities as hashes, implies -v") + boolean hash = false; @Parameter(names = {"--histogram"}, description = "print a histogram of the key-value sizes") boolean histogram = false; @Parameter(description = " <file> { <file> ... }") @@ -98,29 +104,46 @@ public class PrintInfo implements KeywordExecutable { CachableBlockFile.Reader _rdr = new CachableBlockFile.Reader(fs, path, conf, null, null, aconf); Reader iter = new RFile.Reader(_rdr); + MetricsGatherer<Map<String, ArrayList<VisibilityMetric>>> vmg = new VisMetricsGatherer(); + + if (opts.vis || opts.hash) + iter.registerMetrics(vmg); iter.printInfo(); System.out.println(); org.apache.accumulo.core.file.rfile.bcfile.PrintInfo.main(new String[] {arg}); - if (opts.histogram || opts.dump) { - iter.seek(new Range((Key) null, (Key) null), new ArrayList<ByteSequence>(), false); - while (iter.hasTop()) { - Key key = iter.getTopKey(); - Value value = iter.getTopValue(); - if (opts.dump) - System.out.println(key + " -> " + value); - if (opts.histogram) { - long size = key.getSize() + value.getSize(); - int bucket = (int) Math.log10(size); - countBuckets[bucket]++; - sizeBuckets[bucket] += size; - totalSize += size; + Map<String, ArrayList<ByteSequence>> localityGroupCF = null; + + if (opts.histogram || opts.dump || opts.vis || opts.hash) { + localityGroupCF = iter.getLocalityGroupCF(); + + for (Entry<String,ArrayList<ByteSequence>> cf : localityGroupCF.entrySet()) { + + iter.seek(new Range((Key) null, (Key) null), cf.getValue(), true); + while (iter.hasTop()) { + Key key = iter.getTopKey(); + Value value = iter.getTopValue(); + if (opts.dump) + System.out.println(key + " -> " + value); + if (opts.histogram) { + long size = key.getSize() + value.getSize(); + int bucket = (int) Math.log10(size); + countBuckets[bucket]++; + sizeBuckets[bucket] += size; + totalSize += size; + } + iter.next(); } - iter.next(); } } + System.out.println(); + iter.close(); + + if (opts.vis || opts.hash) + vmg.printMetrics(opts.hash, "Visibility", System.out); + if (opts.histogram) { System.out.println("Up to size count %-age"); for (int i = 1; i < countBuckets.length; i++) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/8c2294df/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java index 0b464d8..888924d 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java @@ -554,6 +554,8 @@ public class RFile { if (entriesLeft == 0) { currBlock.close(); + if (metricsGatherer != null) + metricsGatherer.startBlock(); if (iiter.hasNext()) { IndexEntry indexEntry = iiter.next(); @@ -561,7 +563,7 @@ public class RFile { currBlock = getDataBlock(indexEntry); checkRange = range.afterEndKey(indexEntry.getKey()); - if (!checkRange) + if (!checkRange) hasTop = true; } else { @@ -575,6 +577,10 @@ public class RFile { prevKey = rk.getKey(); rk.readFields(currBlock); val.readFields(currBlock); + + if (metricsGatherer != null) + metricsGatherer.addMetric(rk.getKey(), val); + entriesLeft--; if (checkRange) hasTop = !range.afterEndKey(rk.getKey()); @@ -760,6 +766,11 @@ public class RFile { while (hasTop() && range.beforeStartKey(getTopKey())) { next(); } + + if (metricsGatherer != null) { + metricsGatherer.startLocalityGroup(rk.getKey().getColumnFamily()); + metricsGatherer.addMetric(rk.getKey(), val); + } } @Override @@ -803,6 +814,12 @@ public class RFile { public InterruptibleIterator getIterator() { return this; } + + private MetricsGatherer<?> metricsGatherer; + + public void registerMetrics(MetricsGatherer<?> vmg) { + metricsGatherer = vmg; + } } public static class Reader extends HeapIterator implements FileSKVIterator { @@ -973,8 +990,38 @@ public class RFile { } + public Map<String,ArrayList<ByteSequence>> getLocalityGroupCF() { + Map<String,ArrayList<ByteSequence>> cf = new HashMap<>(); + + for (LocalityGroupMetadata lcg : localityGroups) { + ArrayList<ByteSequence> setCF = new ArrayList<ByteSequence>(); + + for (Entry<ByteSequence,MutableLong> entry : lcg.columnFamilies.entrySet()) { + setCF.add(entry.getKey()); + } + + cf.put(lcg.name, setCF); + } + + return cf; + } + private int numLGSeeked = 0; + /** + * Method that registers the given MetricsGatherer. You can only register one as it will clobber any previously set. The MetricsGatherer should be + * registered before iterating through the LocalityGroups. + * + * @param vmg + * MetricsGatherer to be registered with the LocalityGroupReaders + */ + public void registerMetrics(MetricsGatherer<?> vmg) { + vmg.init(getLocalityGroupCF()); + for (LocalityGroupReader lgr : lgReaders) { + lgr.registerMetrics(vmg); + } + } + @Override public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { numLGSeeked = LocalityGroupIterator.seek(this, lgReaders, nonDefaultColumnFamilies, range, columnFamilies, inclusive); @@ -999,7 +1046,6 @@ public class RFile { for (LocalityGroupMetadata lgm : localityGroups) { lgm.printInfo(); } - } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/8c2294df/core/src/main/java/org/apache/accumulo/core/file/rfile/VisMetricsGatherer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/VisMetricsGatherer.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/VisMetricsGatherer.java new file mode 100644 index 0000000..6050e41 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/VisMetricsGatherer.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.rfile; + +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.core.data.ArrayByteSequence; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.io.Text; + +import com.google.common.base.Charsets; +import com.google.common.hash.HashCode; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; +import com.google.common.util.concurrent.AtomicLongMap; + +/** + * This class provides visibility metrics per locality group. The Map in getMetrics() maps the locality group name to an ArrayList of VisibilityMetric objects. + * These contain the components of a visibility metric; the visibility as a String, the number of times that is seen in a locality group, the percentage of keys + * that contain that visibility in the locality group, the number of blocks in the locality group that contain the visibility, and the percentage of blocks in + * the locality group that contain the visibility. + */ +public class VisMetricsGatherer implements MetricsGatherer<Map<String,ArrayList<VisibilityMetric>>> { + + protected Map<String,AtomicLongMap<String>> metric; + protected Map<String,AtomicLongMap<String>> blocks; + protected ArrayList<Long> numEntries; + protected ArrayList<Integer> numBlocks; + private ArrayList<String> inBlock; + protected ArrayList<String> localityGroups; + private int numLG; + private Map<String,ArrayList<ByteSequence>> localityGroupCF; + + public VisMetricsGatherer() { + metric = new HashMap<>(); + blocks = new HashMap<>(); + numEntries = new ArrayList<>(); + numBlocks = new ArrayList<>(); + inBlock = new ArrayList<>(); + localityGroups = new ArrayList<>(); + numLG = 0; + } + + @Override + public void init(Map<String,ArrayList<ByteSequence>> cf) { + localityGroupCF = cf; + } + + @Override + public void startLocalityGroup(Text oneCF) { + String name = null; + ByteSequence cf = new ArrayByteSequence(oneCF.toString()); + for (Entry<String,ArrayList<ByteSequence>> entry : localityGroupCF.entrySet()) { + if (entry.getValue().contains(cf)) { + if (entry.getKey() == null) + name = null; + else + name = entry.getKey().toString(); + break; + } + } + localityGroups.add(name); + metric.put(name, AtomicLongMap.create(new HashMap<String,Long>())); + blocks.put(name, AtomicLongMap.create(new HashMap<String,Long>())); + numLG++; + numEntries.add((long) 0); + numBlocks.add(0); + } + + @Override + public void addMetric(Key key, Value val) { + String myMetric = key.getColumnVisibility().toString(); + String currLG = localityGroups.get(numLG - 1); + if (metric.get(currLG).containsKey(myMetric)) { + metric.get(currLG).getAndIncrement(myMetric); + } else + metric.get(currLG).put(myMetric, 1); + + numEntries.set(numLG - 1, numEntries.get(numLG - 1) + 1); + + if (!inBlock.contains(myMetric) && blocks.get(currLG).containsKey(myMetric)) { + blocks.get(currLG).incrementAndGet(myMetric); + inBlock.add(myMetric); + } else if (!inBlock.contains(myMetric) && !blocks.get(currLG).containsKey(myMetric)) { + blocks.get(currLG).put(myMetric, 1); + inBlock.add(myMetric); + } + + } + + @Override + public void startBlock() { + inBlock.clear(); + numBlocks.set(numLG - 1, numBlocks.get(numLG - 1) + 1); + } + + @Override + public void printMetrics(boolean hash, String metricWord, PrintStream out) { + for (int i = 0; i < numLG; i++) { + String lGName = localityGroups.get(i); + out.print("Locality Group: "); + if (lGName == null) + out.println("<DEFAULT>"); + else + out.println(localityGroups.get(i)); + out.printf("%-27s", metricWord); + out.println("Number of keys" + "\t " + "Percent of keys" + "\t" + "Number of blocks" + "\t" + "Percent of blocks"); + for (Entry<String,Long> entry : metric.get(lGName).asMap().entrySet()) { + HashFunction hf = Hashing.md5(); + HashCode hc = hf.newHasher().putString(entry.getKey(), Charsets.UTF_8).hash(); + if (hash) + out.printf("%-20s", hc.toString().substring(0, 8)); + else + out.printf("%-20s", entry.getKey()); + out.print("\t\t" + entry.getValue() + "\t\t\t"); + out.printf("%.2f", ((double) entry.getValue() / numEntries.get(i)) * 100); + out.print("%\t\t\t"); + + long blocksIn = blocks.get(lGName).get(entry.getKey()); + + out.print(blocksIn + "\t\t "); + out.printf("%.2f", ((double) blocksIn / numBlocks.get(i)) * 100); + out.print("%"); + + out.println(""); + } + out.println("Number of keys: " + numEntries.get(i)); + out.println(); + } + } + + @Override + public Map<String,ArrayList<VisibilityMetric>> getMetrics() { + Map<String,ArrayList<VisibilityMetric>> getMetrics = new HashMap<>(); + for (int i = 0; i < numLG; i++) { + String lGName = localityGroups.get(i); + ArrayList<VisibilityMetric> rows = new ArrayList<>(); + for (Entry<String,Long> entry : metric.get(lGName).asMap().entrySet()) { + long vis = entry.getValue(); + double visPer = ((double) entry.getValue() / numEntries.get(i)) * 100; + + long blocksIn = blocks.get(lGName).get(entry.getKey()); + double blocksPer = ((double) blocksIn / numBlocks.get(i)) * 100; + + rows.add(new VisibilityMetric(entry.getKey(), vis, visPer, blocksIn, blocksPer)); + } + getMetrics.put(lGName, rows); + } + return getMetrics; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/8c2294df/core/src/main/java/org/apache/accumulo/core/file/rfile/VisibilityMetric.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/VisibilityMetric.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/VisibilityMetric.java new file mode 100644 index 0000000..ab7b1d7 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/VisibilityMetric.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.rfile; + +/** + * Class that holds the components of a visibility metric. The String visibility, the number of times that is seen in a locality group, the percentage of keys + * that contain that visibility in the locality group, the number of blocks in the locality group that contain the visibility, and the percentage of blocks in + * the locality group that contain the visibility. + */ +public class VisibilityMetric { + + private long visLG, visBlock; + private double visLGPer, visBlockPer; + private String visibility; + + public VisibilityMetric(String visibility, long visLG, double visLGPer, long visBlock, double visBlockPer) { + this.visibility = visibility; + this.visLG = visLG; + this.visLGPer = visLGPer; + this.visBlock = visBlock; + this.visBlockPer = visBlockPer; + } + + /** + * @return the visibility + */ + public String getVisibility() { + return visibility; + } + + /** + * @return the visLG + */ + public long getVisLG() { + return visLG; + } + + /** + * @return the visBlock + */ + public long getVisBlock() { + return visBlock; + } + + /** + * @return the visLGPer + */ + public double getVisLGPer() { + return visLGPer; + } + + /** + * @return the visBlockPer + */ + public double getVisBlockPer() { + return visBlockPer; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/8c2294df/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java new file mode 100644 index 0000000..e66210b --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java @@ -0,0 +1,515 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.rfile; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.data.ArrayByteSequence; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import com.google.common.util.concurrent.AtomicLongMap; + +/** + * + */ +public class RFileMetricsTest { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target")); + + static { + Logger.getLogger(org.apache.hadoop.io.compress.CodecPool.class).setLevel(Level.OFF); + Logger.getLogger(org.apache.hadoop.util.NativeCodeLoader.class).setLevel(Level.OFF); + } + + public static class TestRFile extends RFileTest.TestRFile { + + public TestRFile(AccumuloConfiguration accumuloConfiguration) { + super(accumuloConfiguration); + } + + public VisMetricsGatherer gatherMetrics() throws IOException { + VisMetricsGatherer vmg = new VisMetricsGatherer(); + reader.registerMetrics(vmg); + Map<String,ArrayList<ByteSequence>> localityGroupCF = reader.getLocalityGroupCF(); + + for (Entry<String,ArrayList<ByteSequence>> cf : localityGroupCF.entrySet()) { + + reader.seek(new Range((Key) null, (Key) null), cf.getValue(), true); + while (reader.hasTop()) { + reader.next(); + } + } + return vmg; + } + } + + public AccumuloConfiguration conf = null; + + @Test + public void emptyFile() throws IOException { + + // test an empty file + + TestRFile trf = new TestRFile(conf); + + trf.openWriter(); + trf.closeWriter(); + + trf.openReader(false); + + VisMetricsGatherer vmg = trf.gatherMetrics(); + + Map<String,AtomicLongMap<String>> metrics = vmg.metric; + Map<String,AtomicLongMap<String>> blocks = vmg.blocks; + assertEquals(0, metrics.size()); + + assertEquals(0, blocks.size()); + + trf.closeReader(); + } + + @Test + public void oneEntryDefaultLocGroup() throws IOException { + + // test an rfile with one entry in the default locality group + + TestRFile trf = new TestRFile(conf); + + trf.openWriter(); + trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo")); + trf.closeWriter(); + + trf.openReader(false); + + VisMetricsGatherer vmg = trf.gatherMetrics(); + + AtomicLongMap<String> metrics = vmg.metric.get(null); + AtomicLongMap<String> blocks = vmg.blocks.get(null); + assertEquals(1, metrics.get("L1")); + + assertEquals(1, blocks.get("L1")); + + assertEquals(1, vmg.numEntries.get(vmg.localityGroups.indexOf(null)).longValue()); + assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf(null)).longValue()); + + trf.closeReader(); + } + + @Test + public void twoEntriesDefaultLocGroup() throws IOException { + + // test an rfile with two entries in the default locality group + + TestRFile trf = new TestRFile(conf); + + trf.openWriter(); + trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo")); + trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L2", 55), RFileTest.nv("foo")); + trf.closeWriter(); + + trf.openReader(false); + + VisMetricsGatherer vmg = trf.gatherMetrics(); + + AtomicLongMap<String> metrics = vmg.metric.get(null); + AtomicLongMap<String> blocks = vmg.blocks.get(null); + assertEquals(1, metrics.get("L1")); + assertEquals(1, metrics.get("L2")); + + assertEquals(1, blocks.get("L1")); + assertEquals(1, blocks.get("L2")); + + assertEquals(2, vmg.numEntries.get(vmg.localityGroups.indexOf(null)).longValue()); + assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf(null)).longValue()); + + trf.closeReader(); + + } + + @Test + public void oneEntryNonDefaultLocGroup() throws IOException { + + // test an rfile with two entries in a non-default locality group + + TestRFile trf = new TestRFile(conf); + + trf.openWriter(false); + Set<ByteSequence> lg1 = new HashSet<>(); + lg1.add(new ArrayByteSequence("cf1")); + + trf.writer.startNewLocalityGroup("lg1", lg1); + trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo")); + trf.closeWriter(); + + trf.openReader(false); + + VisMetricsGatherer vmg = trf.gatherMetrics(); + + AtomicLongMap<String> metrics = vmg.metric.get("lg1"); + AtomicLongMap<String> blocks = vmg.blocks.get("lg1"); + assertEquals(1, metrics.get("L1")); + + assertEquals(1, blocks.get("L1")); + + assertEquals(1, vmg.numEntries.get(vmg.localityGroups.indexOf("lg1")).longValue()); + assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf("lg1")).longValue()); + + trf.closeReader(); + + } + + @Test + public void twoEntryNonDefaultLocGroup() throws IOException { + + // test an rfile with two entries in a non-default locality group + + TestRFile trf = new TestRFile(conf); + + trf.openWriter(false); + Set<ByteSequence> lg1 = new HashSet<>(); + lg1.add(new ArrayByteSequence("cf1")); + + trf.writer.startNewLocalityGroup("lg1", lg1); + trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo")); + trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L2", 55), RFileTest.nv("foo")); + trf.closeWriter(); + + trf.openReader(false); + + VisMetricsGatherer vmg = trf.gatherMetrics(); + + AtomicLongMap<String> metrics = vmg.metric.get("lg1"); + AtomicLongMap<String> blocks = vmg.blocks.get("lg1"); + assertEquals(1, metrics.get("L1")); + assertEquals(1, metrics.get("L2")); + + assertEquals(1, blocks.get("L1")); + assertEquals(1, blocks.get("L2")); + + assertEquals(2, vmg.numEntries.get(vmg.localityGroups.indexOf("lg1")).longValue()); + assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf("lg1")).longValue()); + + trf.closeReader(); + + } + + @Test + public void twoNonDefaultLocGroups() throws IOException { + + // test an rfile with two entries in 2 non-default locality groups + + TestRFile trf = new TestRFile(conf); + + trf.openWriter(false); + Set<ByteSequence> lg1 = new HashSet<>(); + lg1.add(new ArrayByteSequence("cf1")); + + trf.writer.startNewLocalityGroup("lg1", lg1); + trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo")); + trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L2", 55), RFileTest.nv("foo")); + + Set<ByteSequence> lg2 = new HashSet<>(); + lg2.add(new ArrayByteSequence("cf2")); + + trf.writer.startNewLocalityGroup("lg2", lg2); + trf.writer.append(RFileTest.nk("r1", "cf2", "cq1", "L1", 55), RFileTest.nv("foo")); + trf.writer.append(RFileTest.nk("r1", "cf2", "cq1", "L2", 55), RFileTest.nv("foo")); + + trf.closeWriter(); + + trf.openReader(false); + + VisMetricsGatherer vmg = trf.gatherMetrics(); + + AtomicLongMap<String> metrics = vmg.metric.get("lg1"); + AtomicLongMap<String> blocks = vmg.blocks.get("lg1"); + assertEquals(1, metrics.get("L1")); + assertEquals(1, metrics.get("L2")); + + assertEquals(1, blocks.get("L1")); + assertEquals(1, blocks.get("L2")); + + assertEquals(2, vmg.numEntries.get(vmg.localityGroups.indexOf("lg1")).longValue()); + assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf("lg1")).longValue()); + + metrics = vmg.metric.get("lg2"); + blocks = vmg.blocks.get("lg2"); + assertEquals(1, metrics.get("L1")); + assertEquals(1, metrics.get("L2")); + + assertEquals(1, blocks.get("L1")); + assertEquals(1, blocks.get("L2")); + + assertEquals(2, vmg.numEntries.get(vmg.localityGroups.indexOf("lg2")).longValue()); + assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf("lg2")).longValue()); + + trf.closeReader(); + + } + + @Test + public void nonDefaultAndDefaultLocGroup() throws IOException { + + // test an rfile with 3 entries in a non-default locality group and the default locality group + + TestRFile trf = new TestRFile(conf); + + trf.openWriter(false); + Set<ByteSequence> lg1 = new HashSet<>(); + lg1.add(new ArrayByteSequence("cf1")); + + trf.writer.startNewLocalityGroup("lg1", lg1); + trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo")); + trf.writer.append(RFileTest.nk("r1", "cf1", "cq2", "L1", 55), RFileTest.nv("foo")); + trf.writer.append(RFileTest.nk("r1", "cf1", "cq2", "L2", 55), RFileTest.nv("foo")); + + trf.writer.startDefaultLocalityGroup(); + trf.writer.append(RFileTest.nk("r1", "cf2", "cq1", "A", 55), RFileTest.nv("foo")); + trf.writer.append(RFileTest.nk("r1", "cf2", "cq1", "B", 55), RFileTest.nv("foo")); + + trf.closeWriter(); + + trf.openReader(false); + + VisMetricsGatherer vmg = trf.gatherMetrics(); + + AtomicLongMap<String> metrics = vmg.metric.get("lg1"); + AtomicLongMap<String> blocks = vmg.blocks.get("lg1"); + assertEquals(2, metrics.get("L1")); + assertEquals(1, metrics.get("L2")); + + assertEquals(1, blocks.get("L1")); + assertEquals(1, blocks.get("L2")); + + assertEquals(3, vmg.numEntries.get(vmg.localityGroups.indexOf("lg1")).longValue()); + assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf("lg1")).longValue()); + + metrics = vmg.metric.get(null); + blocks = vmg.blocks.get(null); + assertEquals(1, metrics.get("A")); + assertEquals(1, metrics.get("B")); + + assertEquals(1, blocks.get("A")); + assertEquals(1, blocks.get("B")); + + assertEquals(2, vmg.numEntries.get(vmg.localityGroups.indexOf(null)).longValue()); + assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf(null)).longValue()); + + trf.closeReader(); + + } + + @Test + public void multiCFNonDefaultAndDefaultLocGroup() throws IOException { + + // test an rfile with multiple column families in a non-default locality group and the default locality group + + TestRFile trf = new TestRFile(conf); + + trf.openWriter(false); + Set<ByteSequence> lg1 = new HashSet<>(); + lg1.add(new ArrayByteSequence("cf1")); + lg1.add(new ArrayByteSequence("cf3")); + + trf.writer.startNewLocalityGroup("lg1", lg1); + trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo")); + trf.writer.append(RFileTest.nk("r1", "cf1", "cq2", "L1", 55), RFileTest.nv("foo")); + trf.writer.append(RFileTest.nk("r1", "cf3", "cq1", "L1", 55), RFileTest.nv("foo")); + trf.writer.append(RFileTest.nk("r1", "cf3", "cq2", "L2", 55), RFileTest.nv("foo")); + + trf.writer.startDefaultLocalityGroup(); + trf.writer.append(RFileTest.nk("r1", "cf2", "cq1", "A", 55), RFileTest.nv("foo")); + trf.writer.append(RFileTest.nk("r1", "cf2", "cq1", "B", 55), RFileTest.nv("foo")); + trf.writer.append(RFileTest.nk("r1", "cf4", "cq1", "A", 55), RFileTest.nv("foo")); + trf.writer.append(RFileTest.nk("r1", "cf4", "cq1", "B", 55), RFileTest.nv("foo")); + + trf.closeWriter(); + + trf.openReader(false); + + VisMetricsGatherer vmg = trf.gatherMetrics(); + + AtomicLongMap<String> metrics = vmg.metric.get("lg1"); + AtomicLongMap<String> blocks = vmg.blocks.get("lg1"); + assertEquals(3, metrics.get("L1")); + assertEquals(1, metrics.get("L2")); + + assertEquals(1, blocks.get("L1")); + assertEquals(1, blocks.get("L2")); + + assertEquals(4, vmg.numEntries.get(vmg.localityGroups.indexOf("lg1")).longValue()); + assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf("lg1")).longValue()); + + metrics = vmg.metric.get(null); + blocks = vmg.blocks.get(null); + assertEquals(2, metrics.get("A")); + assertEquals(2, metrics.get("B")); + + assertEquals(1, blocks.get("A")); + assertEquals(1, blocks.get("B")); + + assertEquals(4, vmg.numEntries.get(vmg.localityGroups.indexOf(null)).longValue()); + assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf(null)).longValue()); + + trf.closeReader(); + + } + + @Test + public void multiBlockDefaultLocGroup() throws IOException { + + // test an rfile with four blocks in the default locality group + + TestRFile trf = new TestRFile(conf); + + trf.openWriter(20);// Each entry is a block + trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo")); + trf.writer.append(RFileTest.nk("r1", "cf1", "cq2", "L1", 55), RFileTest.nv("foo")); + trf.writer.append(RFileTest.nk("r1", "cf3", "cq1", "L1", 55), RFileTest.nv("foo")); + trf.writer.append(RFileTest.nk("r1", "cf3", "cq2", "L2", 55), RFileTest.nv("foo")); + trf.closeWriter(); + + trf.openReader(false); + + VisMetricsGatherer vmg = trf.gatherMetrics(); + + AtomicLongMap<String> metrics = vmg.metric.get(null); + AtomicLongMap<String> blocks = vmg.blocks.get(null); + assertEquals(3, metrics.get("L1")); + assertEquals(1, metrics.get("L2")); + + assertEquals(3, blocks.get("L1")); + assertEquals(1, blocks.get("L2")); + + assertEquals(4, vmg.numEntries.get(vmg.localityGroups.indexOf(null)).longValue()); + assertEquals(4, vmg.numBlocks.get(vmg.localityGroups.indexOf(null)).longValue()); + + trf.closeReader(); + + } + + @Test + public void multiBlockNonDefaultLocGroup() throws IOException { + + // test an rfile with four blocks in a non-default locality group + + TestRFile trf = new TestRFile(conf); + + trf.openWriter(false, 20);// Each entry is a block + Set<ByteSequence> lg1 = new HashSet<>(); + lg1.add(new ArrayByteSequence("cf1")); + lg1.add(new ArrayByteSequence("cf3")); + + trf.writer.startNewLocalityGroup("lg1", lg1); + trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo")); + trf.writer.append(RFileTest.nk("r1", "cf1", "cq2", "L1", 55), RFileTest.nv("foo")); + trf.writer.append(RFileTest.nk("r1", "cf3", "cq1", "L1", 55), RFileTest.nv("foo")); + trf.writer.append(RFileTest.nk("r1", "cf3", "cq2", "L2", 55), RFileTest.nv("foo")); + trf.closeWriter(); + + trf.openReader(false); + + VisMetricsGatherer vmg = trf.gatherMetrics(); + + AtomicLongMap<String> metrics = vmg.metric.get("lg1"); + AtomicLongMap<String> blocks = vmg.blocks.get("lg1"); + assertEquals(3, metrics.get("L1")); + assertEquals(1, metrics.get("L2")); + + assertEquals(3, blocks.get("L1")); + assertEquals(1, blocks.get("L2")); + + assertEquals(4, vmg.numEntries.get(vmg.localityGroups.indexOf("lg1")).longValue()); + assertEquals(4, vmg.numBlocks.get(vmg.localityGroups.indexOf("lg1")).longValue()); + + trf.closeReader(); + + } + + @Test + public void multiBlockMultiCFNonDefaultAndDefaultLocGroup() throws IOException { + + // test an rfile with multiple column families and multiple blocks in a non-default locality group and the default locality group + + TestRFile trf = new TestRFile(conf); + + trf.openWriter(false, 20);// Each entry is a block + Set<ByteSequence> lg1 = new HashSet<>(); + lg1.add(new ArrayByteSequence("cf1")); + lg1.add(new ArrayByteSequence("cf3")); + + trf.writer.startNewLocalityGroup("lg1", lg1); + trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo")); + trf.writer.append(RFileTest.nk("r1", "cf1", "cq2", "L1", 55), RFileTest.nv("foo")); + trf.writer.append(RFileTest.nk("r1", "cf3", "cq1", "L1", 55), RFileTest.nv("foo")); + trf.writer.append(RFileTest.nk("r1", "cf3", "cq2", "L2", 55), RFileTest.nv("foo")); + + trf.writer.startDefaultLocalityGroup(); + trf.writer.append(RFileTest.nk("r1", "cf2", "cq1", "A", 55), RFileTest.nv("foo")); + trf.writer.append(RFileTest.nk("r1", "cf2", "cq1", "B", 55), RFileTest.nv("foo")); + trf.writer.append(RFileTest.nk("r1", "cf4", "cq1", "A", 55), RFileTest.nv("foo")); + trf.writer.append(RFileTest.nk("r1", "cf4", "cq1", "B", 55), RFileTest.nv("foo")); + + trf.closeWriter(); + + trf.openReader(false); + + VisMetricsGatherer vmg = trf.gatherMetrics(); + + AtomicLongMap<String> metrics = vmg.metric.get("lg1"); + AtomicLongMap<String> blocks = vmg.blocks.get("lg1"); + assertEquals(3, metrics.get("L1")); + assertEquals(1, metrics.get("L2")); + + assertEquals(3, blocks.get("L1")); + assertEquals(1, blocks.get("L2")); + + assertEquals(4, vmg.numEntries.get(vmg.localityGroups.indexOf("lg1")).longValue()); + assertEquals(4, vmg.numBlocks.get(vmg.localityGroups.indexOf("lg1")).longValue()); + + metrics = vmg.metric.get(null); + blocks = vmg.blocks.get(null); + assertEquals(2, metrics.get("A")); + assertEquals(2, metrics.get("B")); + + assertEquals(2, blocks.get("A")); + assertEquals(2, blocks.get("B")); + + assertEquals(4, vmg.numEntries.get(vmg.localityGroups.indexOf(null)).longValue()); + assertEquals(4, vmg.numBlocks.get(vmg.localityGroups.indexOf(null)).longValue()); + + trf.closeReader(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/8c2294df/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java index 1a83f33..eafadc0 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java @@ -169,13 +169,13 @@ public class RFileTest { public static class TestRFile { - private Configuration conf = CachedConfiguration.getInstance(); + protected Configuration conf = CachedConfiguration.getInstance(); public RFile.Writer writer; - private ByteArrayOutputStream baos; - private FSDataOutputStream dos; - private SeekableByteArrayInputStream bais; - private FSDataInputStream in; - private AccumuloConfiguration accumuloConfiguration; + protected ByteArrayOutputStream baos; + protected FSDataOutputStream dos; + protected SeekableByteArrayInputStream bais; + protected FSDataInputStream in; + protected AccumuloConfiguration accumuloConfiguration; public Reader reader; public SortedKeyValueIterator<Key,Value> iter; @@ -186,18 +186,25 @@ public class RFileTest { } public void openWriter(boolean startDLG) throws IOException { + openWriter(startDLG, 1000); + } + public void openWriter(boolean startDLG, int blockSize) throws IOException { baos = new ByteArrayOutputStream(); dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a")); CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(dos, "gz", conf, accumuloConfiguration); - writer = new RFile.Writer(_cbw, 1000, 1000); + writer = new RFile.Writer(_cbw, blockSize, 1000); if (startDLG) writer.startDefaultLocalityGroup(); } public void openWriter() throws IOException { - openWriter(true); + openWriter(true, 1000); + } + + public void openWriter(int blockSize) throws IOException { + openWriter(true, blockSize); } public void closeWriter() throws IOException { @@ -210,6 +217,10 @@ public class RFileTest { } public void openReader() throws IOException { + openReader(true); + } + + public void openReader(boolean cfsi) throws IOException { int fileLength = 0; byte[] data = null; @@ -224,7 +235,8 @@ public class RFileTest { CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, fileLength, conf, dataCache, indexCache, AccumuloConfiguration.getDefaultConfiguration()); reader = new RFile.Reader(_cbr); - iter = new ColumnFamilySkippingIterator(reader); + if (cfsi) + iter = new ColumnFamilySkippingIterator(reader); checkIndex(reader); }