Repository: accumulo Updated Branches: refs/heads/master a1cf8e222 -> 861db793d
ACCUMULO-4597 fixed bug in rfile-info Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/de80cf53 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/de80cf53 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/de80cf53 Branch: refs/heads/master Commit: de80cf53b66f58a9a99d8a9dd54b5a716fa8df44 Parents: c3a0d1d Author: Keith Turner <ktur...@apache.org> Authored: Mon Mar 6 18:48:29 2017 -0500 Committer: Keith Turner <ktur...@apache.org> Committed: Mon Mar 6 18:48:29 2017 -0500 ---------------------------------------------------------------------- .../accumulo/core/file/rfile/PrintInfo.java | 7 +- .../apache/accumulo/core/file/rfile/RFile.java | 18 +++++- .../accumulo/core/util/LocalityGroupUtil.java | 34 ++++++++++ .../core/file/rfile/RFileMetricsTest.java | 68 ++++++++++++++++++-- 4 files changed, 116 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/de80cf53/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java index 0f2a935..366e4a8 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java @@ -19,7 +19,6 @@ package org.apache.accumulo.core.file.rfile; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import org.apache.accumulo.core.cli.Help; import org.apache.accumulo.core.conf.DefaultConfiguration; @@ -30,6 +29,7 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; import org.apache.accumulo.core.file.rfile.RFile.Reader; +import org.apache.accumulo.core.util.LocalityGroupUtil; import org.apache.accumulo.start.spi.KeywordExecutable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -119,9 +119,8 @@ public class PrintInfo implements KeywordExecutable { if (opts.histogram || opts.dump || opts.vis || opts.hash) { localityGroupCF = iter.getLocalityGroupCF(); - for (Entry<String,ArrayList<ByteSequence>> cf : localityGroupCF.entrySet()) { - - iter.seek(new Range((Key) null, (Key) null), cf.getValue(), true); + for (String lgName : localityGroupCF.keySet()) { + LocalityGroupUtil.seek(iter, new Range(), lgName, localityGroupCF); while (iter.hasTop()) { Key key = iter.getTopKey(); Value value = iter.getTopValue(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/de80cf53/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java index 851226c..bab2266 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java @@ -62,6 +62,7 @@ import org.apache.accumulo.core.iterators.system.HeapIterator; import org.apache.accumulo.core.iterators.system.InterruptibleIterator; import org.apache.accumulo.core.iterators.system.LocalityGroupIterator; import org.apache.accumulo.core.iterators.system.LocalityGroupIterator.LocalityGroup; +import org.apache.accumulo.core.util.LocalityGroupUtil; import org.apache.accumulo.core.util.MutableByteSequence; import org.apache.commons.lang.mutable.MutableLong; import org.apache.commons.math.stat.descriptive.SummaryStatistics; @@ -69,6 +70,8 @@ import org.apache.hadoop.io.Writable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + public class RFile { public static final String EXTENSION = "rf"; @@ -1015,14 +1018,23 @@ public class RFile { } + /** + * @return map of locality group names to column families. The default locality group will have {@code null} for a name. RFile will only track up to + * {@value Writer#MAX_CF_IN_DLG} families for the default locality group. After this it will stop tracking. For the case where the default group has + * more thn {@value Writer#MAX_CF_IN_DLG} families an empty list of families is returned. + * @see LocalityGroupUtil#seek(Reader, Range, String, Map) + */ public Map<String,ArrayList<ByteSequence>> getLocalityGroupCF() { Map<String,ArrayList<ByteSequence>> cf = new HashMap<>(); for (LocalityGroupMetadata lcg : localityGroups) { - ArrayList<ByteSequence> setCF = new ArrayList<>(); + ArrayList<ByteSequence> setCF; - for (Entry<ByteSequence,MutableLong> entry : lcg.columnFamilies.entrySet()) { - setCF.add(entry.getKey()); + if (lcg.columnFamilies == null) { + Preconditions.checkState(lcg.isDefaultLG, " Group %s has null families. Only expect default locality group to have null families.", lcg.name); + setCF = new ArrayList<>(); + } else { + setCF = new ArrayList<>(lcg.columnFamilies.keySet()); } cf.put(lcg.name, setCF); http://git-wip-us.apache.org/repos/asf/accumulo/blob/de80cf53/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java b/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java index 5d06e69..a99fe01 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java @@ -16,6 +16,7 @@ */ package org.apache.accumulo.core.util; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -36,7 +37,9 @@ import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Column; import org.apache.accumulo.core.data.ColumnUpdate; import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.thrift.TMutation; +import org.apache.accumulo.core.file.rfile.RFile.Reader; import org.apache.commons.lang.mutable.MutableLong; import org.apache.hadoop.io.Text; @@ -294,4 +297,35 @@ public class LocalityGroupUtil { } } + /** + * This method created to help seek an rfile for a locality group obtained from {@link Reader#getLocalityGroupCF()}. This method can possibly return an empty + * list for the default locality group. When this happens the default locality group needs to be seeked differently. This method helps do that. + * + * <p> + * For the default locality group will seek using the families of all other locality groups non-inclusive. + * + * @see Reader#getLocalityGroupCF() + */ + public static void seek(Reader reader, Range range, String lgName, Map<String,ArrayList<ByteSequence>> localityGroupCF) throws IOException { + + Collection<ByteSequence> families; + boolean inclusive; + if (lgName == null) { + // this is the default locality group, create a set of all families not in the default group + Set<ByteSequence> nonDefaultFamilies = new HashSet<>(); + for (Entry<String,ArrayList<ByteSequence>> entry : localityGroupCF.entrySet()) { + if (entry.getKey() != null) { + nonDefaultFamilies.addAll(entry.getValue()); + } + } + + families = nonDefaultFamilies; + inclusive = false; + } else { + families = localityGroupCF.get(lgName); + inclusive = true; + } + + reader.seek(range, families, inclusive); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/de80cf53/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java index 89a63d1..7cb0ed6 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java @@ -21,15 +21,17 @@ import static org.junit.Assert.assertEquals; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.util.LocalityGroupUtil; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.junit.After; @@ -93,9 +95,8 @@ public class RFileMetricsTest { reader.registerMetrics(vmg); Map<String,ArrayList<ByteSequence>> localityGroupCF = reader.getLocalityGroupCF(); - for (Entry<String,ArrayList<ByteSequence>> cf : localityGroupCF.entrySet()) { - - reader.seek(new Range((Key) null, (Key) null), cf.getValue(), true); + for (String lgName : localityGroupCF.keySet()) { + LocalityGroupUtil.seek(reader, new Range(), lgName, localityGroupCF); while (reader.hasTop()) { reader.next(); } @@ -507,4 +508,63 @@ public class RFileMetricsTest { trf.closeReader(); } + @Test + public void testManyFamiliesInDefaultLocGroup() throws IOException { + trf.openWriter(false, 1024); + + String fam1 = String.format("%06x", 9000); + String fam2 = String.format("%06x", 9001); + + Set<ByteSequence> lg1 = new HashSet<>(); + lg1.add(new ArrayByteSequence(fam1)); + lg1.add(new ArrayByteSequence(fam2)); + + trf.writer.startNewLocalityGroup("lg1", lg1); + + for (int row = 0; row < 1100; row++) { + String rs = String.format("%06x", row); + trf.writer.append(new Key(rs, fam1, "q4", "A", 42l), new Value("v".getBytes())); + trf.writer.append(new Key(rs, fam2, "q4", "A|B", 42l), new Value("v".getBytes())); + } + + trf.writer.startDefaultLocalityGroup(); + + String vis[] = new String[] {"A", "A&B", "A|C", "B&C", "Boo"}; + + int fam = 0; + for (int row = 0; row < 1000; row++) { + String rs = String.format("%06x", row); + for (int v = 0; v < 5; v++) { + String fs = String.format("%06x", fam++); + trf.writer.append(new Key(rs, fs, "q4", vis[v], 42l), new Value("v".getBytes())); + } + } + + trf.closeWriter(); + + trf.openReader(false); + + VisMetricsGatherer vmg = trf.gatherMetrics(); + + Map<String,Long> expected = new HashMap<>(); + Map<String,Long> expectedBlocks = new HashMap<>(); + for (String v : vis) { + expected.put(v, 1000l); + expectedBlocks.put(v, 67l); + } + assertEquals(expected, vmg.metric.get(null).asMap()); + assertEquals(expectedBlocks, vmg.blocks.get(null).asMap()); + + expected.clear(); + expectedBlocks.clear(); + expected.put("A", 1100l); + expected.put("A|B", 1100l); + expectedBlocks.put("A", 32l); + expectedBlocks.put("A|B", 32l); + assertEquals(expected, vmg.metric.get("lg1").asMap()); + assertEquals(expectedBlocks, vmg.blocks.get("lg1").asMap()); + + assertEquals(2, vmg.metric.keySet().size()); + assertEquals(2, vmg.blocks.keySet().size()); + } }