KYLIN-2192 More Robust Global Dictionary
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4a0ee798 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4a0ee798 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4a0ee798 Branch: refs/heads/master Commit: 4a0ee7989d5f8272592b980fce3f5716ca40d4c1 Parents: e562aaf Author: sunyerui <sunye...@gmail.com> Authored: Mon Nov 21 21:26:34 2016 +0800 Committer: gaodayue <gaoda...@meituan.com> Committed: Fri Dec 2 13:33:59 2016 +0800 ---------------------------------------------------------------------- .../kylin/job/dataGen/FactTableGenerator.java | 12 +- .../apache/kylin/common/KylinConfigBase.java | 8 + .../apache/kylin/common/util/Dictionary.java | 2 +- .../model/validation/rule/DictionaryRule.java | 78 +- .../validation/rule/DictionaryRuleTest.java | 28 +- .../apache/kylin/dict/AppendTrieDictionary.java | 285 +++++-- .../kylin/dict/AppendTrieDictionaryChecker.java | 102 +++ .../org/apache/kylin/dict/CachedTreeMap.java | 260 +++--- .../kylin/dict/GlobalDictionaryBuilder.java | 36 +- .../kylin/dict/AppendTrieDictionaryTest.java | 150 +++- .../apache/kylin/dict/CachedTreeMapTest.java | 320 +++++--- .../kylin/measure/bitmap/BitmapCounterTest.java | 6 +- ...t_kylin_cube_without_slr_left_join_desc.json | 16 +- .../localmeta/data/DEFAULT.TEST_KYLIN_FACT.csv | 804 +++++++++---------- .../flatten_data_for_without_slr_left_join.csv | 804 +++++++++---------- .../test_kylin_inner_join_model_desc.json | 3 +- .../test_kylin_inner_join_view_model_desc.json | 3 +- .../test_kylin_left_join_model_desc.json | 3 +- .../test_kylin_left_join_view_model_desc.json | 3 +- .../table/DEFAULT.TEST_KYLIN_FACT.json | 8 +- .../source/hive/ITHiveTableReaderTest.java | 2 +- .../query/sql_distinct_precisely/query00.sql | 2 +- .../query/sql_distinct_precisely/query01.sql | 2 +- .../query/sql_distinct_precisely/query02.sql | 2 +- .../query/sql_distinct_precisely/query03.sql | 3 +- .../query/sql_distinct_precisely/query04.sql | 3 +- .../query/sql_distinct_precisely/query05.sql | 25 - .../query/sql_distinct_precisely/query06.sql | 26 - .../query/sql_distinct_precisely/query07.sql | 24 - 29 files changed, 1737 insertions(+), 1283 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java b/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java index 8068fd1..677b713 100644 --- a/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java +++ b/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java @@ -403,13 +403,13 @@ public class FactTableGenerator { } private String createRandomCell(ColumnDesc cDesc) { - String type = cDesc.getTypeName(); - String s = type.toLowerCase(); - if (s.equals("string") || s.equals("char") || s.equals("varchar")) { + DataType type =cDesc.getType(); + String s = type.getName(); + if (s.equals("char") || s.equals("varchar")) { StringBuilder sb = new StringBuilder(); - for (int i = 0; i < 2; i++) { - sb.append((char) ('a' + r.nextInt(10)));// there are 10*10 - // possible strings + int len = Math.min(type.getPrecision(), 3); + for (int i = 0; i < len; i++) { + sb.append((char) ('a' + r.nextInt(10))); // cardinality at most 10x10x10 } return sb.toString(); } else if (s.equals("bigint") || s.equals("int") || s.equals("tinyint") || s.equals("smallint")) { http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 3c10826..f46c185 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -232,6 +232,14 @@ abstract public class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.dictionary.append-entry-size", "10000000")); } + public int getAppendDictMaxVersions() { + return Integer.parseInt(getOptional("kylin.dictionary.append-max-versions", "3")); + } + + public int getAppendDictVersionTTL() { + return Integer.parseInt(getOptional("kylin.dictionary.append-version-ttl", "259200000")); + } + // for test public void setAppendDictEntrySize(int entrySize) { setProperty("kylin.dictionary.append-entry-size", String.valueOf(entrySize)); http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java b/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java index 0fb299c..1e172bc 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java @@ -158,7 +158,7 @@ abstract public class Dictionary<T> implements Serializable { return nullId(); else { int id = getIdFromValueBytesImpl(value, offset, len, roundingFlag); - if (id < 0) + if (id == -1) throw new IllegalArgumentException("Value '" + Bytes.toString(value, offset, len) + "' (" + Bytes.toStringBinary(value, offset, len) + ") not exists!"); return id; } http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java index d06c816..37889c2 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java @@ -18,9 +18,12 @@ package org.apache.kylin.cube.model.validation.rule; -import java.util.HashMap; +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import org.apache.commons.lang.StringUtils; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.DictionaryDesc; import org.apache.kylin.cube.model.validation.IValidatorRule; @@ -29,9 +32,19 @@ import org.apache.kylin.cube.model.validation.ValidateContext; import org.apache.kylin.metadata.model.TblColRef; /** - * Created by sunyerui on 16/6/1. + * Validate Dictionary Settings: + * + * <ul> + * <li> no duplicated dictionary for one column + * <li> dictionary can't set both `reuse` and `builder` + * <li> transitive `reuse` like "a <- b <- c" is not allowed, force "a <- b, a <- c" + * </ul> */ public class DictionaryRule implements IValidatorRule<CubeDesc> { + static final String ERROR_DUPLICATE_DICTIONARY_COLUMN = "Duplicated dictionary specification for column: "; + static final String ERROR_REUSE_BUILDER_BOTH_SET = "REUSE and BUILDER both set on dictionary for column: "; + static final String ERROR_REUSE_BUILDER_BOTH_EMPTY = "REUSE and BUILDER both empty on dictionary for column: "; + static final String ERROR_TRANSITIVE_REUSE = "Transitive REUSE is not allowed for dictionary: "; @Override public void validate(CubeDesc cubeDesc, ValidateContext context) { @@ -40,40 +53,43 @@ public class DictionaryRule implements IValidatorRule<CubeDesc> { return; } - HashMap<TblColRef, String> colToBuilderMap = new HashMap<>(); - HashMap<TblColRef, TblColRef> colToReuseColMap = new HashMap<>(); + Set<TblColRef> allDictCols = new HashSet<>(); + Set<TblColRef> baseCols = new HashSet<>(); // col with builder + List<DictionaryDesc> reuseDictionaries = new ArrayList<>(); + + // first pass for (DictionaryDesc dictDesc : dictDescs) { TblColRef dictCol = dictDesc.getColumnRef(); - if (dictCol == null) { - context.addResult(ResultLevel.ERROR, "Some column in dictionaries not found"); + TblColRef reuseCol = dictDesc.getResuseColumnRef(); + String builderClass = dictDesc.getBuilderClass(); + + if (!allDictCols.add(dictCol)) { + context.addResult(ResultLevel.ERROR, ERROR_DUPLICATE_DICTIONARY_COLUMN + dictCol); return; } - String builder = dictDesc.getBuilderClass(); - TblColRef reuseCol = dictDesc.getResuseColumnRef(); - if (reuseCol == null) { - if (builder == null || builder.isEmpty()) { - context.addResult(ResultLevel.ERROR, "Column " + dictCol + " cannot have builder and reuse column both empty"); - return; - } - - // Make sure the same column associate with same builder class - String oldBuilder = colToBuilderMap.put(dictCol, builder); - if (oldBuilder != null && !oldBuilder.equals(builder)) { - context.addResult(ResultLevel.ERROR, "Column " + dictCol + " has inconsistent builders " + builder + " and " + oldBuilder); - return; - } + + if (reuseCol != null && StringUtils.isNotEmpty(builderClass)) { + context.addResult(ResultLevel.ERROR, ERROR_REUSE_BUILDER_BOTH_SET + dictCol); + return; + } + + if (reuseCol == null && StringUtils.isEmpty(builderClass)) { + context.addResult(ResultLevel.ERROR, ERROR_REUSE_BUILDER_BOTH_EMPTY + dictCol); + return; + } + + if (reuseCol != null) { + reuseDictionaries.add(dictDesc); } else { - if (builder != null && !builder.isEmpty()) { - context.addResult(ResultLevel.ERROR, "Column " + dictCol + " cannot have builder and reuse column both"); - return; - } - - // Make sure one column only reuse another one column - TblColRef oldReuseCol = colToReuseColMap.put(dictCol, reuseCol); - if (oldReuseCol != null && !reuseCol.equals(oldReuseCol)) { - context.addResult(ResultLevel.ERROR, "Column " + dictCol + " reuse inconsistent column " + reuseCol + " and " + oldReuseCol); - return; - } + baseCols.add(dictCol); + } + } + + // second pass: check no transitive reuse + for (DictionaryDesc dictDesc : reuseDictionaries) { + if (!baseCols.contains(dictDesc.getResuseColumnRef())) { + context.addResult(ResultLevel.ERROR, ERROR_TRANSITIVE_REUSE + dictDesc.getColumnRef()); + return; } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java index 9b37507..b6e0bcb 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java @@ -18,6 +18,10 @@ package org.apache.kylin.cube.model.validation.rule; +import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_DUPLICATE_DICTIONARY_COLUMN; +import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_REUSE_BUILDER_BOTH_EMPTY; +import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_REUSE_BUILDER_BOTH_SET; +import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_TRANSITIVE_REUSE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -36,9 +40,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -/** - * Created by sunyerui on 16/6/1. - */ public class DictionaryRuleTest extends LocalFileMetadataTestCase { private static KylinConfig config; @@ -65,24 +66,31 @@ public class DictionaryRuleTest extends LocalFileMetadataTestCase { desc.init(config); ValidateContext vContext = new ValidateContext(); rule.validate(desc, vContext); - vContext.print(System.out); assertTrue(vContext.getResults().length == 0); } } @Test public void testBadDesc() throws IOException { - testDictionaryDesc("Column EDW.TEST_SITES.SITE_NAME has inconsistent builders " + "FakeBuilderClass and org.apache.kylin.dict.GlobalDictionaryBuilder", DictionaryDesc.create("SITE_NAME", null, "FakeBuilderClass")); + testDictionaryDesc(ERROR_DUPLICATE_DICTIONARY_COLUMN, DictionaryDesc.create("USER_ID", null, "FakeBuilderClass")); + testDictionaryDesc(ERROR_DUPLICATE_DICTIONARY_COLUMN, DictionaryDesc.create("USER_ID", null, GlobalDictionaryBuilder.class.getName())); } @Test public void testBadDesc2() throws IOException { - testDictionaryDesc("Column DEFAULT.TEST_KYLIN_FACT.LSTG_SITE_ID cannot have builder and reuse column both", DictionaryDesc.create("lstg_site_id", "SITE_NAME", "FakeBuilderClass")); + testDictionaryDesc(ERROR_REUSE_BUILDER_BOTH_SET, DictionaryDesc.create("lstg_site_id", "SITE_NAME", "FakeBuilderClass")); } @Test public void testBadDesc3() throws IOException { - testDictionaryDesc("Column DEFAULT.TEST_KYLIN_FACT.LSTG_SITE_ID cannot have builder and reuse column both empty", DictionaryDesc.create("lstg_site_id", null, null)); + testDictionaryDesc(ERROR_REUSE_BUILDER_BOTH_EMPTY, DictionaryDesc.create("lstg_site_id", null, null)); + } + + @Test + public void testBadDesc4() throws IOException { + testDictionaryDesc(ERROR_TRANSITIVE_REUSE, + DictionaryDesc.create("lstg_site_id", "USER_ID", null), + DictionaryDesc.create("price", "lstg_site_id", null)); } @Test @@ -102,13 +110,13 @@ public class DictionaryRuleTest extends LocalFileMetadataTestCase { desc.init(config); ValidateContext vContext = new ValidateContext(); rule.validate(desc, vContext); - vContext.print(System.out); if (expectMessage == null) { assertTrue(vContext.getResults().length == 0); } else { - assertTrue(vContext.getResults().length >= 1); - assertEquals(expectMessage, vContext.getResults()[0].getMessage()); + assertTrue(vContext.getResults().length == 1); + String actualMessage = vContext.getResults()[0].getMessage(); + assertTrue(actualMessage.startsWith(expectMessage)); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java index 14980bf..84060a7 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java @@ -31,10 +31,13 @@ import java.lang.ref.SoftReference; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.IdentityHashMap; import java.util.LinkedList; import java.util.List; +import java.util.NavigableSet; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -49,6 +52,8 @@ import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.metadata.MetadataManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,16 +62,16 @@ import org.slf4j.LoggerFactory; * int IDs, used for global dictionary. * * Trie data is split into sub trees, called {@link DictSlice}, and stored in a {@link CachedTreeMap} with a configurable cache size. - * + * * With Trie the memory footprint of the mapping is kinda minimized at the cost * CPU, if compared to HashMap of ID Arrays. Performance test shows Trie is * roughly 10 times slower, so there's a cache layer overlays on top of Trie and * gracefully fall back to Trie using a weak reference. - * + * * The implementation is NOT thread-safe for now. * * TODO making it thread-safe - * + * * @author sunyerui */ @SuppressWarnings({ "rawtypes", "unchecked", "serial" }) @@ -87,7 +92,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> { transient private int nValues; transient private BytesConverter<T> bytesConverter; - private TreeMap<DictSliceKey, DictSlice> dictSliceMap; + volatile private TreeMap<DictSliceKey, DictSlice> dictSliceMap; transient private boolean enableValueCache = true; transient private SoftReference<HashMap> valueToIdCache; @@ -99,17 +104,23 @@ public class AppendTrieDictionary<T> extends Dictionary<T> { } } - public void update(String baseDir, int baseId, int maxId, int maxValueLength, int nValues, BytesConverter bytesConverter, CachedTreeMap dictMap) throws IOException { + public void initParams(String baseDir, int baseId, int maxId, int maxValueLength, int nValues, BytesConverter bytesConverter) throws IOException { this.baseDir = baseDir; this.baseId = baseId; this.maxId = maxId; this.maxValueLength = maxValueLength; this.nValues = nValues; this.bytesConverter = bytesConverter; + } + public void initDictSliceMap(CachedTreeMap dictMap) throws IOException { int cacheSize = KylinConfig.getInstanceFromEnv().getAppendDictCacheSize(); - dictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(cacheSize).baseDir(baseDir).persistent(true).immutable(true).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build(); - ((CachedTreeMap)dictSliceMap).loadEntry(dictMap); + int maxVersions = KylinConfig.getInstanceFromEnv().getAppendDictMaxVersions(); + long versionTTL = KylinConfig.getInstanceFromEnv().getAppendDictVersionTTL(); + CachedTreeMap newDictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(cacheSize).baseDir(baseDir) + .immutable(true).maxVersions(maxVersions).versionTTL(versionTTL).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build(); + newDictSliceMap.loadEntry(dictMap); + this.dictSliceMap = newDictSliceMap; } public byte[] writeDictMap() throws IOException { @@ -123,6 +134,13 @@ public class AppendTrieDictionary<T> extends Dictionary<T> { return dictMapBytes; } + // The dict id starts from 1 to 2147483647 and 2147483648 to -2, leave 0 and -1 used for uninitialized state + public static void checkValidId(int id) { + if (id == 0 || id == -1) { + throw new IllegalArgumentException("AppendTrieDictionary Id Overflow Unsigned Integer Size 4294967294"); + } + } + public static class DictSliceKey implements WritableComparable { byte[] key; @@ -181,7 +199,8 @@ public class AppendTrieDictionary<T> extends Dictionary<T> { transient private int nValues; transient private int sizeOfId; - transient private int childOffsetMask; + // mask MUST be long, since childOffset maybe 5 bytes at most + transient private long childOffsetMask; transient private int firstByteOffset; private void init(byte[] trieBytes) { @@ -197,7 +216,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> { this.sizeChildOffset = headIn.read(); this.sizeOfId = headIn.read(); - this.childOffsetMask = ~((BIT_IS_LAST_CHILD | BIT_IS_END_OF_VALUE) << ((sizeChildOffset - 1) * 8)); + this.childOffsetMask = ~(((long)(BIT_IS_LAST_CHILD | BIT_IS_END_OF_VALUE)) << ((sizeChildOffset - 1) * 8)); this.firstByteOffset = sizeChildOffset + 1; // the offset from begin of node to its first value byte } catch (Exception e) { if (e instanceof RuntimeException) @@ -216,7 +235,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> { if (checkFlag(nodeOffset, BIT_IS_END_OF_VALUE)) { break; } - nodeOffset = headSize + (BytesUtil.readUnsigned(trieBytes, nodeOffset, sizeChildOffset) & childOffsetMask); + nodeOffset = headSize + (int)(BytesUtil.readLong(trieBytes, nodeOffset, sizeChildOffset) & childOffsetMask); if (nodeOffset == headSize) { break; } @@ -258,7 +277,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> { } // find a child to continue - int c = headSize + (BytesUtil.readUnsigned(trieBytes, n, sizeChildOffset) & childOffsetMask); + int c = headSize + (int)(BytesUtil.readLong(trieBytes, n, sizeChildOffset) & childOffsetMask); if (c == headSize) // has no children return -1; byte inpByte = inp[o]; @@ -297,7 +316,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> { DictNode root = null; while (true) { int p = n + firstByteOffset; - int childOffset = BytesUtil.readUnsigned(trieBytes, n, sizeChildOffset) & childOffsetMask; + int childOffset = (int)(BytesUtil.readLong(trieBytes, n, sizeChildOffset) & childOffsetMask); int parLen = BytesUtil.readUnsigned(trieBytes, p - 1, 1); boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE); @@ -329,6 +348,53 @@ public class AppendTrieDictionary<T> extends Dictionary<T> { return root; } + public boolean doCheck() { + int offset = headSize; + HashSet<Integer> parentSet = new HashSet<>(); + boolean lastChild = false; + + while (offset < trieBytes.length) { + if (lastChild) { + boolean contained = parentSet.remove(offset - headSize); + // Can't find parent, the data is corrupted + if (!contained) { + return false; + } + lastChild = false; + } + int p = offset + firstByteOffset; + int childOffset = (int)(BytesUtil.readLong(trieBytes, offset, sizeChildOffset) & childOffsetMask); + int parLen = BytesUtil.readUnsigned(trieBytes, p - 1, 1); + boolean isEndOfValue = checkFlag(offset, BIT_IS_END_OF_VALUE); + + // Copy value overflow, the data is corrupted + if (trieBytes.length < p + parLen) { + return false; + } + + // Check id is fine + if (isEndOfValue) { + BytesUtil.readUnsigned(trieBytes, p + parLen, sizeOfId); + } + + // Record it if has children + if (childOffset != 0) { + parentSet.add(childOffset); + } + + // brothers done, move to next parent + if (checkFlag(offset, BIT_IS_LAST_CHILD)) { + lastChild = true; + } + + // move to next node + offset += firstByteOffset + parLen + (isEndOfValue ? sizeOfId : 0); + } + + // ParentMap is empty, meaning all nodes has parent, the data is correct + return parentSet.isEmpty(); + } + public void write(DataOutput out) throws IOException { out.write(trieBytes); } @@ -341,7 +407,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> { throw new IllegalArgumentException("Wrong file type (magic does not match)"); DataInputStream headIn = new DataInputStream(// - new ByteArrayInputStream(headPartial, HEAD_SIZE_I, headPartial.length - HEAD_SIZE_I)); + new ByteArrayInputStream(headPartial, HEAD_SIZE_I, headPartial.length - HEAD_SIZE_I)); int headSize = headIn.readShort(); int bodyLen = headIn.readInt(); headIn.close(); @@ -398,6 +464,9 @@ public class AppendTrieDictionary<T> extends Dictionary<T> { this.id = o.id; this.isEndOfValue = o.isEndOfValue; this.children = o.children; + for (DictNode child : o.children) { + child.parent = this; + } this.nValuesBeneath = o.nValuesBeneath; this.parent = o.parent; this.childrenCount = o.childrenCount; @@ -602,7 +671,8 @@ public class AppendTrieDictionary<T> extends Dictionary<T> { // nValueBytes if (n.part.length > 255) - throw new RuntimeException(); + throw new RuntimeException("Value length is " + n.part.length + + " and larger than 255: " + Bytes.toStringBinary(n.part)); BytesUtil.writeUnsigned(n.part.length, trieBytes, o, 1); o++; @@ -611,7 +681,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> { o += n.part.length; if (n.isEndOfValue) { - assert n.id > 0; + checkValidId(n.id); BytesUtil.writeUnsigned(n.id, trieBytes, o, sizeId); o += sizeId; } @@ -715,12 +785,13 @@ public class AppendTrieDictionary<T> extends Dictionary<T> { s.mbpn_sizeId = 4; s.mbpn_sizeValueTotal = s.nValueBytesCompressed + s.nValues * s.mbpn_sizeId; s.mbpn_sizeNoValueBytes = 1; - s.mbpn_sizeChildOffset = 4; + s.mbpn_sizeChildOffset = 5; s.mbpn_footprint = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset); while (true) { // minimize the offset size to match the footprint int t = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset - 1); // *4 because 2 MSB of offset is used for isEndOfValue & isEndChild flag - if (BytesUtil.sizeForValue(t * 4) <= s.mbpn_sizeChildOffset - 1) { + // expand t to long before *4, avoiding exceed Integer.MAX_VALUE + if (BytesUtil.sizeForValue((long)t * 4) <= s.mbpn_sizeChildOffset - 1) { s.mbpn_sizeChildOffset--; s.mbpn_footprint = t; } else @@ -760,31 +831,97 @@ public class AppendTrieDictionary<T> extends Dictionary<T> { } public static class Builder<T> { - private String baseDir; + private static ConcurrentHashMap<String, Pair<Integer, Builder>> builderInstanceAndCountMap = new ConcurrentHashMap(); + + public static Builder getInstance(String resourcePath) throws IOException { + return getInstance(resourcePath, null); + } + + public synchronized static Builder getInstance(String resourcePath, AppendTrieDictionary dict) throws IOException { + Pair<Integer, Builder> entry = builderInstanceAndCountMap.get(resourcePath); + if (entry == null) { + entry = new Pair<>(0, createNewBuilder(resourcePath, dict)); + builderInstanceAndCountMap.put(resourcePath, entry); + } + entry.setFirst(entry.getFirst() + 1); + return entry.getSecond(); + } + + // return true if entry still in map + private synchronized static boolean releaseInstance(String resourcePath) { + Pair<Integer, Builder> entry = builderInstanceAndCountMap.get(resourcePath); + if (entry != null) { + entry.setFirst(entry.getFirst() - 1); + if (entry.getFirst() <= 0) { + builderInstanceAndCountMap.remove(resourcePath); + return false; + } + return true; + } + return false; + } + + public static Builder createNewBuilder(String resourcePath, AppendTrieDictionary existDict) throws IOException { + String dictDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict" + resourcePath + "/"; + + AppendTrieDictionary dictToUse = existDict; + if (dictToUse == null) { + // Try to load the existing dict from cache, making sure there's only the same one object in memory + NavigableSet<String> dicts = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getStore().listResources(resourcePath); + ArrayList<String> appendDicts = new ArrayList<>(); + if (dicts != null && !dicts.isEmpty()) { + for (String dict : dicts) { + DictionaryInfo info = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getStore().getResource(dict, DictionaryInfo.class, DictionaryInfoSerializer.INFO_SERIALIZER); + if (info.getDictionaryClass().equals(AppendTrieDictionary.class.getName())) { + appendDicts.add(dict); + } + } + } + if (appendDicts.isEmpty()) { + dictToUse = null; + } else if (appendDicts.size() == 1) { + dictToUse = (AppendTrieDictionary) DictionaryManager.getInstance(KylinConfig.getInstanceFromEnv()).getDictionary(appendDicts.get(0)); + } else { + throw new IllegalStateException(String.format("GlobalDict %s should have 0 or 1 append dict but %d", resourcePath, appendDicts.size())); + } + } + + AppendTrieDictionary.Builder<String> builder; + if (dictToUse == null) { + logger.info("GlobalDict {} is empty, create new one", resourcePath); + builder = new Builder<>(resourcePath, null, dictDir, 0, 0, 0, new StringBytesConverter(), null); + } else { + logger.info("GlobalDict {} exist, append value", dictToUse); + builder = new Builder<>(resourcePath, dictToUse, dictToUse.baseDir, dictToUse.maxId, dictToUse.maxValueLength, + dictToUse.nValues, dictToUse.bytesConverter, dictToUse.writeDictMap()); + } + + return builder; + } + + private final String resourcePath; + private final String baseDir; private int maxId; private int maxValueLength; private int nValues; - private BytesConverter<T> bytesConverter; + private final BytesConverter<T> bytesConverter; - private AppendTrieDictionary dict; + private final AppendTrieDictionary dict; - private TreeMap<DictSliceKey, DictNode> mutableDictSliceMap; - private static int MAX_ENTRY_IN_SLICE = 10_000_000; + private final TreeMap<DictSliceKey, DictNode> mutableDictSliceMap; + private int MAX_ENTRY_IN_SLICE = 10_000_000; private static final double MAX_ENTRY_OVERHEAD_FACTOR = 1.0; private int processedCount = 0; - public static Builder create(String baseDir) throws IOException { - return new Builder<>(null, baseDir, 0, 0, 0, new StringBytesConverter(), null); - } - - public static Builder create(AppendTrieDictionary dict) throws IOException { - return new Builder<>(dict, dict.baseDir, dict.maxId, dict.maxValueLength, dict.nValues, dict.bytesConverter, dict.writeDictMap()); - } - // Constructor for a new Dict - private Builder(AppendTrieDictionary dict, String baseDir, int maxId, int maxValueLength, int nValues, BytesConverter<T> bytesConverter, byte[] dictMapBytes) throws IOException { - this.dict = dict; + private Builder(String resourcePath, AppendTrieDictionary dict, String baseDir, int maxId, int maxValueLength, int nValues, BytesConverter<T> bytesConverter, byte[] dictMapBytes) throws IOException { + this.resourcePath = resourcePath; + if (dict == null) { + this.dict = new AppendTrieDictionary<T>(); + } else { + this.dict = dict; + } this.baseDir = baseDir; this.maxId = maxId; this.maxValueLength = maxValueLength; @@ -793,8 +930,11 @@ public class AppendTrieDictionary<T> extends Dictionary<T> { MAX_ENTRY_IN_SLICE = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize(); int cacheSize = KylinConfig.getInstanceFromEnv().getAppendDictCacheSize(); + int maxVersions = KylinConfig.getInstanceFromEnv().getAppendDictMaxVersions(); + long versionTTL = KylinConfig.getInstanceFromEnv().getAppendDictVersionTTL(); // create a new cached map with baseDir - mutableDictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(cacheSize).baseDir(baseDir).keyClazz(DictSliceKey.class).valueClazz(DictNode.class).persistent(true).immutable(false).build(); + mutableDictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(cacheSize).baseDir(baseDir) + .maxVersions(maxVersions).versionTTL(versionTTL).keyClazz(DictSliceKey.class).valueClazz(DictNode.class).immutable(false).build(); if (dictMapBytes != null) { ((Writable) mutableDictSliceMap).readFields(new DataInputStream(new ByteArrayInputStream(dictMapBytes))); } @@ -804,7 +944,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> { addValue(bytesConverter.convertToBytes(value)); } - public void addValue(byte[] value) { + private synchronized void addValue(byte[] value) { if (++processedCount % 1_000_000 == 0) { logger.debug("add value count " + processedCount); } @@ -859,15 +999,41 @@ public class AppendTrieDictionary<T> extends Dictionary<T> { private int createNextId() { int id = ++maxId; - if (maxId < 0) { - throw new IllegalArgumentException("AppendTrieDictionary Id overflow Integer.MAX_VALUE"); - } + checkValidId(id); nValues++; return id; } + // Only used for test + public void setMaxId(int id) { + this.maxId = id; + } + + // When add a new node, the value part maybe over 255 bytes, need split it into a sub tree + private DictNode addNodeMaybeOverflow(byte[] value, int start, int end) { + DictNode head = null; + DictNode current = null; + for (; start + 255 < end; start += 255) { + DictNode c = new DictNode(BytesUtil.subarray(value, start, start + 255), false); + if (head == null) { + head = c; + current = c; + } else { + current.addChild(c); + current = c; + } + } + DictNode last = new DictNode(BytesUtil.subarray(value, start, end), true); + last.id = createNextId(); + if (head == null) { + head = last; + } else { + current.addChild(last); + } + return head; + } + private void addValueR(DictNode node, byte[] value, int start) { - assert value.length - start <= 255 : "value bytes overflow than 255"; // match the value part of current node int i = 0, j = start; int n = node.part.length, nn = value.length; @@ -903,8 +1069,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> { if (i < n) { DictNode c1 = new DictNode(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children); c1.id = node.id; - DictNode c2 = new DictNode(BytesUtil.subarray(value, j, nn), true); - c2.id = createNextId(); + DictNode c2 = addNodeMaybeOverflow(value, j, nn); node.reset(BytesUtil.subarray(node.part, 0, i), false); if (comp < 0) { node.addChild(c1); @@ -940,18 +1105,17 @@ public class AppendTrieDictionary<T> extends Dictionary<T> { addValueR(node.children.get(mid), value, j); } else { // otherwise, make the value a new child - DictNode c = new DictNode(BytesUtil.subarray(value, j, nn), true); - c.id = createNextId(); + DictNode c = addNodeMaybeOverflow(value, j, nn); node.addChild(comp <= 0 ? mid : mid + 1, c); } } - public AppendTrieDictionary<T> build(int baseId) throws IOException { - if (dict == null) { - dict = new AppendTrieDictionary<T>(); - } - dict.update(baseDir, baseId, maxId, maxValueLength, nValues, bytesConverter, (CachedTreeMap)mutableDictSliceMap); - dict.flushIndex((CachedTreeMap) mutableDictSliceMap); + public synchronized AppendTrieDictionary<T> build(int baseId) throws IOException { + boolean keepAppend = releaseInstance(resourcePath); + CachedTreeMap dictSliceMap = (CachedTreeMap)mutableDictSliceMap; + dict.initParams(baseDir, baseId, maxId, maxValueLength, nValues, bytesConverter); + dict.flushIndex(dictSliceMap, keepAppend); + dict.initDictSliceMap(dictSliceMap); return dict; } @@ -970,8 +1134,6 @@ public class AppendTrieDictionary<T> extends Dictionary<T> { } DictSlice slice = dictSliceMap.get(sliceKey); int id = slice.getIdFromValueBytesImpl(value, offset, len, roundingFlag); - if (id < 0) - logger.error("Not a valid value: " + bytesConverter.convertFromBytes(value, offset, len)); return id; } @@ -1031,25 +1193,24 @@ public class AppendTrieDictionary<T> extends Dictionary<T> { throw new UnsupportedOperationException("AppendTrieDictionary can't retrive value from id"); } - public void flushIndex(CachedTreeMap dictSliceMap) throws IOException { - Path filePath = new Path(dictSliceMap.getCurrentDir() + "/.index"); - Configuration conf = new Configuration(); - try (FSDataOutputStream indexOut = (FileSystem.get(filePath.toUri(), conf)).create(filePath, true, 8 * 1024 * 1024, (short) 5, 8 * 1024 * 1024 * 8)) { + public void flushIndex(CachedTreeMap dictSliceMap, boolean keepAppend) throws IOException { + try (FSDataOutputStream indexOut = dictSliceMap.openIndexOutput()) { indexOut.writeInt(baseId); indexOut.writeInt(maxId); indexOut.writeInt(maxValueLength); indexOut.writeInt(nValues); indexOut.writeUTF(bytesConverter.getClass().getName()); dictSliceMap.write(indexOut); + dictSliceMap.commit(keepAppend); } - dictSliceMap.commit(false); } @Override public AppendTrieDictionary copyToAnotherMeta(KylinConfig srcConfig, KylinConfig dstConfig) throws IOException { Configuration conf = new Configuration(); AppendTrieDictionary newDict = new AppendTrieDictionary(); - newDict.update(baseDir.replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory()), baseId, maxId, maxValueLength, nValues, bytesConverter, (CachedTreeMap)dictSliceMap); + newDict.initParams(baseDir.replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory()), baseId, maxId, maxValueLength, nValues, bytesConverter); + newDict.initDictSliceMap((CachedTreeMap)dictSliceMap); logger.info("Copy AppendDict from {} to {}", this.baseDir, newDict.baseDir); Path srcPath = new Path(this.baseDir); Path dstPath = new Path(newDict.baseDir); @@ -1071,9 +1232,8 @@ public class AppendTrieDictionary<T> extends Dictionary<T> { @Override public void readFields(DataInput in) throws IOException { String baseDir = in.readUTF(); - Path filePath = new Path(baseDir + "/.index"); Configuration conf = new Configuration(); - try (FSDataInputStream input = (FileSystem.get(filePath.toUri(), conf)).open(filePath, 8 * 1024 * 1024)) { + try (FSDataInputStream input = CachedTreeMap.openLatestIndexInput(conf, baseDir)) { int baseId = input.readInt(); int maxId = input.readInt(); int maxValueLength = input.readInt(); @@ -1087,10 +1247,13 @@ public class AppendTrieDictionary<T> extends Dictionary<T> { throw new IOException(e); } } + initParams(baseDir, baseId, maxId, maxValueLength, nValues, converter); + + // Create instance for deserialize data, and update to map in dict CachedTreeMap dictMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder() - .baseDir(baseDir).persistent(true).immutable(true).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build(); + .baseDir(baseDir).immutable(true).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build(); dictMap.readFields(input); - update(baseDir, baseId, maxId, maxValueLength, nValues, converter, dictMap); + initDictSliceMap(dictMap); } } @@ -1120,4 +1283,6 @@ public class AppendTrieDictionary<T> extends Dictionary<T> { public boolean contains(Dictionary other) { return false; } + } + http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java new file mode 100644 index 0000000..f231275 --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.dict; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Created by sunyerui on 16/11/15. + */ +public class AppendTrieDictionaryChecker { + + public boolean runChecker(String baseDir) throws IOException { + Configuration conf = new Configuration(); + Path basePath = new Path(baseDir); + FileSystem fs = FileSystem.get(basePath.toUri(), conf); + List<Path> sliceList = new ArrayList<>(); + List<Path> corruptedSliceList = new ArrayList<>(); + listDictSlicePath(fs, fs.getFileStatus(basePath), sliceList); + + for (Path path : sliceList) { + if (!doCheck(fs, path)) { + System.out.println("AppendDict Slice " + path + " corrupted"); + corruptedSliceList.add(path); + } else { + System.out.println("AppendDict Slice " + path + " is right"); + } + } + + if (corruptedSliceList.isEmpty()) { + System.out.println("ALL AppendDict Slices is right"); + return true; + } else { + System.out.println("Some AppendDict Slice(s) corrupted: "); + for (Path path : corruptedSliceList) { + System.out.println(path.toString()); + } + return false; + } + } + + public void listDictSlicePath(FileSystem fs, FileStatus path, List<Path> list) throws IOException { + if (path.isDirectory()) { + for (FileStatus status : fs.listStatus(path.getPath())) { + listDictSlicePath(fs, status, list); + } + } else { + if (path.getPath().getName().startsWith(CachedTreeMap.CACHED_PREFIX)) { + list.add(path.getPath()); + } + } + } + + public boolean doCheck(FileSystem fs, Path filePath) { + try (FSDataInputStream input = fs.open(filePath, CachedTreeMap.BUFFER_SIZE)) { + AppendTrieDictionary.DictSlice slice = new AppendTrieDictionary.DictSlice(); + slice.readFields(input); + return slice.doCheck(); + } catch (Exception e) { + return false; + } catch (Error e) { + return false; + } + } + + public static void main(String[] args) throws IOException { + String path = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict/"; + if (args.length > 0) { + path = args[0]; + } + System.out.println("Recursive Check AppendTrieDictionary Slices in path " + path); + AppendTrieDictionaryChecker checker = new AppendTrieDictionaryChecker(); + if (checker.runChecker(path)) { + System.exit(0); + } else { + System.exit(-1); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java b/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java index 1ea3c1c..6acf764 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java @@ -31,9 +31,11 @@ import java.util.concurrent.ExecutionException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.slf4j.Logger; @@ -56,25 +58,29 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext private final Class<V> valueClazz; transient volatile Collection<V> values; private final LoadingCache<K, V> valueCache; - private final TreeSet<String> fileList; private final Configuration conf; - private final String baseDir; - private final String tmpDir; + private final Path baseDir; + private final Path versionDir; + private final Path workingDir; private final FileSystem fs; - private final boolean persistent; private final boolean immutable; - private long writeValueTime = 0; - private long readValueTime = 0; + private final int maxVersions; + private final long versionTTL; + private boolean keepAppend; - private static final int BUFFER_SIZE = 8 * 1024 * 1024; + public static final int BUFFER_SIZE = 8 * 1024 * 1024; + + public static final String CACHED_PREFIX = "cached_"; + public static final String VERSION_PREFIX = "version_"; public static class CachedTreeMapBuilder<K, V> { private Class<K> keyClazz; private Class<V> valueClazz; private int maxCount = 8; private String baseDir; - private boolean persistent; private boolean immutable; + private int maxVersions; + private long versionTTL; public static CachedTreeMapBuilder newBuilder() { return new CachedTreeMapBuilder(); @@ -103,13 +109,18 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext return this; } - public CachedTreeMapBuilder<K, V> persistent(boolean persistent) { - this.persistent = persistent; + public CachedTreeMapBuilder<K, V> immutable(boolean immutable) { + this.immutable = immutable; return this; } - public CachedTreeMapBuilder<K, V> immutable(boolean immutable) { - this.immutable = immutable; + public CachedTreeMapBuilder<K, V> maxVersions(int maxVersions) { + this.maxVersions = maxVersions; + return this; + } + + public CachedTreeMapBuilder<K, V> versionTTL(long versionTTL) { + this.versionTTL = versionTTL; return this; } @@ -120,26 +131,38 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext if (keyClazz == null || valueClazz == null) { throw new RuntimeException("CachedTreeMap need key and value clazz to serialize data"); } - CachedTreeMap map = new CachedTreeMap(maxCount, keyClazz, valueClazz, baseDir, persistent, immutable); + CachedTreeMap map = new CachedTreeMap(maxCount, keyClazz, valueClazz, baseDir, immutable, maxVersions, versionTTL); return map; } } - private CachedTreeMap(int maxCount, Class<K> keyClazz, Class<V> valueClazz, String baseDir, boolean persistent, boolean immutable) throws IOException { + private CachedTreeMap(int maxCount, Class<K> keyClazz, Class<V> valueClazz, String basePath, + boolean immutable, int maxVersions, long versionTTL) throws IOException { super(); this.keyClazz = keyClazz; this.valueClazz = valueClazz; - this.fileList = new TreeSet<>(); + this.immutable = immutable; + this.keepAppend = true; + this.maxVersions = maxVersions; + this.versionTTL = versionTTL; this.conf = new Configuration(); - if (baseDir.endsWith("/")) { - this.baseDir = baseDir.substring(0, baseDir.length()-1); - } else { - this.baseDir = baseDir; + if (basePath.endsWith("/")) { + basePath = basePath.substring(0, basePath.length()-1); + } + this.baseDir = new Path(basePath); + this.fs = FileSystem.get(baseDir.toUri(), conf); + if (!fs.exists(baseDir)) { + fs.mkdirs(baseDir); + } + this.versionDir = getLatestVersion(conf, fs, baseDir); + this.workingDir = new Path(baseDir, "working"); + if (!this.immutable) { + // For mutable map, copy all data into working dir and work on it, avoiding suddenly server crash made data corrupt + if (fs.exists(workingDir)) { + fs.delete(workingDir, true); + } + FileUtil.copy(fs, versionDir, fs, workingDir, false, true, conf); } - this.tmpDir = this.baseDir + ".tmp"; - this.fs = FileSystem.get(new Path(baseDir).toUri(), conf); - this.persistent = persistent; - this.immutable = immutable; CacheBuilder builder = CacheBuilder.newBuilder().removalListener(new RemovalListener<K, V>() { @Override public void onRemoval(RemovalNotification<K, V> notification) { @@ -152,24 +175,14 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext deleteValue(notification.getKey()); break; default: - throw new RuntimeException("unexpected evict reason " + notification.getCause()); } } }); - // For immutable values, load all values as much as possible, and evict by soft reference to free memory when gc if (this.immutable) { + // For immutable values, load all values as much as possible, and evict by soft reference to free memory when gc builder.softValues(); } else { builder.maximumSize(maxCount); - // For mutable map, copy all data into tmp and modify on tmp data, avoiding suddenly server crash made data corrupt - if (fs.exists(new Path(tmpDir))) { - fs.delete(new Path(tmpDir), true); - } - if (fs.exists(new Path(this.baseDir))) { - FileUtil.copy(fs, new Path(this.baseDir), fs, new Path(tmpDir), false, true, conf); - } else { - fs.mkdirs(new Path(this.baseDir)); - } } this.valueCache = builder.build(new CacheLoader<K, V>() { @Override @@ -182,38 +195,108 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext } private String generateFileName(K key) { - String file = (immutable ? baseDir : tmpDir) + "/cached_" + key.toString(); + String file = getCurrentDir() + "/" + CACHED_PREFIX + key.toString(); return file; } - public String getCurrentDir() { - return immutable ? baseDir : tmpDir; + private String getCurrentDir() { + return immutable ? versionDir.toString() : workingDir.toString(); } - public void commit(boolean stillMutable) throws IOException { - assert !immutable : "Only support commit method with immutable false"; + private static String[] listAllVersions(FileSystem fs, Path baseDir) throws IOException { + FileStatus[] fileStatus = fs.listStatus(baseDir, new PathFilter() { + @Override + public boolean accept(Path path) { + if (path.getName().startsWith(VERSION_PREFIX)) { + return true; + } + return false; + } + }); + TreeSet<String> versions = new TreeSet<>(); + for (FileStatus status : fileStatus) { + versions.add(status.getPath().toString()); + } + return versions.toArray(new String[versions.size()]); + } - Path basePath = new Path(baseDir); - Path backupPath = new Path(baseDir+".bak"); - Path tmpPath = new Path(tmpDir); - try { - fs.rename(basePath, backupPath); - } catch (IOException e) { - logger.info("CachedTreeMap commit backup basedir failed, " + e, e); - throw e; + // only for test + public String getLatestVersion() throws IOException { + return getLatestVersion(conf, fs, baseDir).toUri().getPath(); + } + + private static Path getLatestVersion(Configuration conf, FileSystem fs, Path baseDir) throws IOException { + String[] versions = listAllVersions(fs, baseDir); + if (versions.length > 0) { + return new Path(versions[versions.length - 1]); + } else { + // Old format, directly use base dir, convert to new format + Path newVersionDir = new Path(baseDir, VERSION_PREFIX + System.currentTimeMillis()); + Path tmpNewVersionDir = new Path(baseDir, "tmp_" + VERSION_PREFIX + System.currentTimeMillis()); + Path indexFile = new Path(baseDir, ".index"); + FileStatus[] cachedFiles; + try { + cachedFiles = fs.listStatus(baseDir, new PathFilter() { + @Override + public boolean accept(Path path) { + if (path.getName().startsWith(CACHED_PREFIX)) { + return true; + } + return false; + } + }); + fs.mkdirs(tmpNewVersionDir); + if (fs.exists(indexFile) && cachedFiles.length > 0) { + FileUtil.copy(fs, indexFile, fs, tmpNewVersionDir, false, true, conf); + for (FileStatus file : cachedFiles) { + FileUtil.copy(fs, file.getPath(), fs, tmpNewVersionDir, false, true, conf); + } + } + fs.rename(tmpNewVersionDir, newVersionDir); + if (fs.exists(indexFile) && cachedFiles.length > 0) { + fs.delete(indexFile, true); + for (FileStatus file : cachedFiles) { + fs.delete(file.getPath(), true); + } + } + } finally { + if (fs.exists(tmpNewVersionDir)) { + fs.delete(tmpNewVersionDir, true); + } + } + return newVersionDir; } + } - try { - if (stillMutable) { - FileUtil.copy(fs, tmpPath, fs, basePath, false, true, conf); - } else { - fs.rename(tmpPath, basePath); + public void commit(boolean keepAppend) throws IOException { + assert this.keepAppend & !immutable : "Only support commit method with immutable false and keepAppend true"; + + Path newVersionDir = new Path(baseDir, VERSION_PREFIX + System.currentTimeMillis()); + if (keepAppend) { + // Copy to tmp dir, and rename to new version, make sure it's complete when be visible + Path tmpNewVersionDir = new Path(baseDir, "tmp_" + VERSION_PREFIX + System.currentTimeMillis()); + try { + FileUtil.copy(fs, workingDir, fs, tmpNewVersionDir, false, true, conf); + fs.rename(tmpNewVersionDir, newVersionDir); + } finally { + if (fs.exists(tmpNewVersionDir)) { + fs.delete(tmpNewVersionDir, true); + } + } + } else { + fs.rename(workingDir, newVersionDir); + } + this.keepAppend = keepAppend; + + // Check versions count, delete expired versions + String[] versions = listAllVersions(fs, baseDir); + long timestamp = System.currentTimeMillis(); + for (int i = 0; i < versions.length - maxVersions; i++) { + String versionString = versions[i].substring(versions[i].lastIndexOf(VERSION_PREFIX) + VERSION_PREFIX.length()); + long version = Long.parseLong(versionString); + if (version + versionTTL < timestamp) { + fs.delete(new Path(versions[i]), true); } - fs.delete(backupPath, true); - } catch (IOException e) { - fs.rename(backupPath, basePath); - logger.info("CachedTreeMap commit move/copy tmpdir failed, " + e, e); - throw e; } } @@ -227,25 +310,17 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext if (immutable) { return; } - long t0 = System.currentTimeMillis(); String fileName = generateFileName(key); Path filePath = new Path(fileName); try (FSDataOutputStream out = fs.create(filePath, true, BUFFER_SIZE, (short) 5, BUFFER_SIZE * 8)) { value.write(out); - if (!persistent) { - fs.deleteOnExit(filePath); - } } catch (Exception e) { logger.error(String.format("write value into %s exception: %s", fileName, e), e); throw new RuntimeException(e.getCause()); - } finally { - fileList.add(fileName); - writeValueTime += System.currentTimeMillis() - t0; } } private V readValue(K key) throws Exception { - long t0 = System.currentTimeMillis(); String fileName = generateFileName(key); Path filePath = new Path(fileName); try (FSDataInputStream input = fs.open(filePath, BUFFER_SIZE)) { @@ -255,13 +330,11 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext } catch (Exception e) { logger.error(String.format("read value from %s exception: %s", fileName, e), e); return null; - } finally { - readValueTime += System.currentTimeMillis() - t0; } } private void deleteValue(K key) { - if (persistent && immutable) { + if (immutable) { return; } String fileName = generateFileName(key); @@ -272,14 +345,12 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext } } catch (Exception e) { logger.error(String.format("delete value file %s exception: %s", fileName, e), e); - } finally { - fileList.remove(fileName); } } @Override public V put(K key, V value) { - assert !immutable : "Only support put method with immutable false"; + assert keepAppend & !immutable : "Only support put method with immutable false and keepAppend true"; super.put(key, null); valueCache.put(key, value); return null; @@ -301,7 +372,7 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext @Override public V remove(Object key) { - assert !immutable : "Only support remove method with immutable false"; + assert keepAppend & !immutable : "Only support remove method with immutable false keepAppend true"; super.remove(key); valueCache.invalidate(key); return null; @@ -357,15 +428,32 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext @Override public void remove() { - assert !immutable : "Only support remove method with immutable false"; + assert keepAppend & !immutable : "Only support remove method with immutable false and keepAppend true"; keyIterator.remove(); valueCache.invalidate(currentKey); } } + public FSDataOutputStream openIndexOutput() throws IOException { + assert keepAppend & !immutable : "Only support write method with immutable false and keepAppend true"; + Path indexPath = new Path(getCurrentDir(), ".index"); + return fs.create(indexPath, true, 8 * 1024 * 1024, (short) 5, 8 * 1024 * 1024 * 8); + } + + public FSDataInputStream openIndexInput() throws IOException { + Path indexPath = new Path(getCurrentDir(), ".index"); + return fs.open(indexPath, 8 * 1024 * 1024); + } + + public static FSDataInputStream openLatestIndexInput(Configuration conf, String baseDir) throws IOException { + Path basePath = new Path(baseDir); + FileSystem fs = FileSystem.get(basePath.toUri(), conf); + Path indexPath = new Path(getLatestVersion(conf, fs, basePath), ".index"); + return fs.open(indexPath, 8 * 1024 * 1024); + } + @Override public void write(DataOutput out) throws IOException { - assert persistent : "Only support serialize with persistent true"; out.writeInt(size()); for (K key : keySet()) { key.write(out); @@ -378,7 +466,6 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext @Override public void readFields(DataInput in) throws IOException { - assert persistent : "Only support deserialize with persistent true"; int size = in.readInt(); try { for (int i = 0; i < size; i++) { @@ -390,27 +477,4 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext throw new IOException(e); } } - - // clean up all tmp files - @Override - public void finalize() throws Throwable { - if (persistent) { - return; - } - try { - this.clear(); - for (String file : fileList) { - try { - Path filePath = new Path(file); - fs.delete(filePath, true); - } catch (Throwable t) { - //do nothing? - } - } - } catch (Throwable t) { - //do nothing - } finally { - super.finalize(); - } - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java index b2a3664..cda3c2b 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java @@ -19,14 +19,8 @@ package org.apache.kylin.dict; import java.io.IOException; -import java.util.ArrayList; -import java.util.NavigableSet; -import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.metadata.MetadataManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * GlobalDictinary based on whole cube, to ensure one value has same dict id in different segments. @@ -34,41 +28,15 @@ import org.slf4j.LoggerFactory; * Created by sunyerui on 16/5/24. */ public class GlobalDictionaryBuilder implements IDictionaryBuilder { - private static final Logger logger = LoggerFactory.getLogger(GlobalDictionaryBuilder.class); - AppendTrieDictionary.Builder<String> builder; int baseId; - + @Override public void init(DictionaryInfo dictInfo, int baseId) throws IOException { if (dictInfo == null) { throw new IllegalArgumentException("GlobalDictinaryBuilder must used with an existing DictionaryInfo"); } - String dictDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict" + dictInfo.getResourceDir() + "/"; - - // Try to load the existing dict from cache, making sure there's only the same one object in memory - NavigableSet<String> dicts = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getStore().listResources(dictInfo.getResourceDir()); - ArrayList<String> appendDicts = new ArrayList<>(); - if (dicts != null && !dicts.isEmpty()) { - for (String dict : dicts) { - DictionaryInfo info = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getStore().getResource(dict, DictionaryInfo.class, DictionaryInfoSerializer.INFO_SERIALIZER); - if (info.getDictionaryClass().equals(AppendTrieDictionary.class.getName())) { - appendDicts.add(dict); - } - } - } - - if (appendDicts.isEmpty()) { - logger.info("GlobalDict {} is empty, create new one", dictInfo.getResourceDir()); - this.builder = AppendTrieDictionary.Builder.create(dictDir); - } else if (appendDicts.size() == 1) { - logger.info("GlobalDict {} exist, append value", appendDicts.get(0)); - AppendTrieDictionary dict = (AppendTrieDictionary) DictionaryManager.getInstance(KylinConfig.getInstanceFromEnv()).getDictionary(appendDicts.get(0)); - this.builder = AppendTrieDictionary.Builder.create(dict); - } else { - throw new IllegalStateException(String.format("GlobalDict %s should have 0 or 1 append dict but %d", dictInfo.getResourceDir(), appendDicts.size())); - } - + this.builder = AppendTrieDictionary.Builder.getInstance(dictInfo.getResourceDir()); this.baseId = baseId; } http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java index 5e1705a..a7e8152 100644 --- a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java +++ b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java @@ -20,13 +20,15 @@ package org.apache.kylin.dict; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; -import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; @@ -35,6 +37,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Random; import java.util.TreeMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -51,6 +55,7 @@ import org.junit.Test; public class AppendTrieDictionaryTest { public static final String BASE_DIR = "/tmp/kylin_append_dict"; + public static final String RESOURCE_DIR = "/dict/append_dict_test"; @BeforeClass public static void setUp() { @@ -64,22 +69,28 @@ public class AppendTrieDictionaryTest { @AfterClass public static void tearDown() { - String workingDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory(); + cleanup(); + } + +// @After + public void afterTest() { + cleanup(); + } + + public static void cleanup() { + Configuration conf = new Configuration(); + Path basePath = new Path(BASE_DIR); try { - FileSystem.get(new Path(workingDir).toUri(), new Configuration()).delete(new Path(workingDir), true); - } catch (IOException e) { - } - File tmpLocalDir = new File(BASE_DIR); - if (tmpLocalDir.exists()) { - for (File f : tmpLocalDir.listFiles()) { - f.delete(); - } - tmpLocalDir.delete(); - } + FileSystem.get(basePath.toUri(), conf).delete(basePath, true); + } catch (IOException e) {} } public static final String[] words = new String[] { "paint", "par", "part", "parts", "partition", "partitions", "party", "partie", "parties", "patient", "taste", "tar", "trie", "try", "tries", "åå ¸", "åå ¸æ ", "忝", // non-ascii characters "", // empty + "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii", + "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiipaiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii", + "paintjkjdfklajkdljfkdsajklfjklsadjkjekjrklewjrklewjklrjklewjkljkljkljkljweklrjewkljrklewjrlkjewkljrkljkljkjlkjjkljkljkljkljlkjlkjlkjljdfadfads" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk", "paint", "tar", "try", // some dup }; @@ -131,8 +142,7 @@ public class AppendTrieDictionaryTest { @Ignore("need huge key set") @Test public void testHugeKeySet() throws IOException { - BytesConverter converter = new StringBytesConverter(); - AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.create(BASE_DIR); + AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR); AppendTrieDictionary<String> dict = null; InputStream is = new FileInputStream("src/test/resources/dict/huge_key"); @@ -162,7 +172,7 @@ public class AppendTrieDictionaryTest { } BytesConverter converter = new StringBytesConverter(); - AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.create(BASE_DIR); + AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR); AppendTrieDictionary<String> dict = null; TreeMap<Integer, String> checkMap = new TreeMap<>(); int firstAppend = rnd.nextInt(strList.size() / 2); @@ -179,12 +189,14 @@ public class AppendTrieDictionaryTest { String str = strList.get(checkIndex); byte[] bytes = converter.convertToBytes(str); int id = dict.getIdFromValueBytesImpl(bytes, 0, bytes.length, 0); + assertNotEquals(String.format("Value %s not exist", str), -1, id); assertFalse(String.format("Id %d for %s should be empty, but is %s", id, str, checkMap.get(id)), checkMap.containsKey(id) && !str.equals(checkMap.get(id))); checkMap.put(id, str); } // reopen dict and append - b = AppendTrieDictionary.Builder.create(dict); +// b = AppendTrieDictionary.Builder.create(dict); + b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR, dict); for (; appendIndex < secondAppend; appendIndex++) { b.addValue(strList.get(appendIndex)); } @@ -197,6 +209,7 @@ public class AppendTrieDictionaryTest { String str = strList.get(checkIndex); byte[] bytes = converter.convertToBytes(str); int id = dict.getIdFromValueBytesImpl(bytes, 0, bytes.length, 0); + assertNotEquals(String.format("Value %s not exist", str), -1, id); if (checkIndex < firstAppend) { assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id)); } else { @@ -207,7 +220,7 @@ public class AppendTrieDictionaryTest { } // reopen dict and append rest str - b = AppendTrieDictionary.Builder.create(dict); + b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR, dict); for (; appendIndex < strList.size(); appendIndex++) { b.addValue(strList.get(appendIndex)); } @@ -220,6 +233,7 @@ public class AppendTrieDictionaryTest { String str = strList.get(checkIndex); byte[] bytes = converter.convertToBytes(str); int id = dict.getIdFromValueBytesImpl(bytes, 0, bytes.length, 0); + assertNotEquals(String.format("Value %s not exist", str), -1, id); if (checkIndex < secondAppend) { assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id)); } else { @@ -240,6 +254,7 @@ public class AppendTrieDictionaryTest { for (String str : strList) { byte[] bytes = converter.convertToBytes(str); int id = dict.getIdFromValueBytesImpl(bytes, 0, bytes.length, 0); + assertNotEquals(String.format("Value %s not exist", str), -1, id); assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id)); } } @@ -260,4 +275,103 @@ public class AppendTrieDictionaryTest { throw new RuntimeException(e); } } -} \ No newline at end of file + + @Test + public void testMaxInteger() throws IOException { + AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR); + builder.setMaxId(Integer.MAX_VALUE - 2); + builder.addValue("a"); + builder.addValue("ab"); + builder.addValue("acd"); + builder.addValue("ac"); + AppendTrieDictionary dict = builder.build(0); + assertEquals(2147483646, dict.getIdFromValueImpl("a", 0)); + assertEquals(2147483647, dict.getIdFromValueImpl("ab", 0)); + assertEquals(-2147483647, dict.getIdFromValueImpl("ac", 0)); + assertEquals(-2147483648, dict.getIdFromValueImpl("acd", 0)); + } + + @Ignore("Only occurred when value is very long (>8000 bytes)") + @Test + public void testSuperLongValue() throws IOException { + AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR); + String value = "a"; + for (int i = 0; i < 10000; i++) { + value += "a"; + try { + builder.addValue(value); + } catch (StackOverflowError e) { + System.out.println("\nstack overflow " + i); + throw e; + } + } + AppendTrieDictionary dictionary = builder.build(0); + dictionary.getMaxId(); + } + + private static class SharedBuilderThread extends Thread { + CountDownLatch startLatch; + CountDownLatch finishLatch; + String resourcePath; + String prefix; + int count; + + SharedBuilderThread(CountDownLatch startLatch, CountDownLatch finishLatch, String resourcePath, String prefix, int count) { + this.startLatch = startLatch; + this.finishLatch = finishLatch; + this.resourcePath = resourcePath; + this.prefix = prefix; + this.count = count; + } + + @Override + public void run() { + try { + AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(resourcePath); + startLatch.countDown(); + for (int i = 0; i < count; i++) { + builder.addValue(prefix + i); + } + builder.build(0); + finishLatch.countDown(); + } catch (IOException e) {} + } + } + + @Test + public void testSharedBuilder() throws IOException, InterruptedException { + String resourcePath = "shared_builder"; + final CountDownLatch startLatch = new CountDownLatch(3); + final CountDownLatch finishLatch = new CountDownLatch(3); + + AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(resourcePath); + Thread t1 = new SharedBuilderThread(startLatch, finishLatch, resourcePath, "t1_", 10000); + Thread t2 = new SharedBuilderThread(startLatch, finishLatch, resourcePath, "t2_", 10); + Thread t3 = new SharedBuilderThread(startLatch, finishLatch, resourcePath, "t3_", 100000); + t1.start(); + t2.start(); + t3.start(); + startLatch.await(); + AppendTrieDictionary dict = builder.build(0); + assertTrue("AppendDictBuilder Thread too slow", finishLatch.await(3000, TimeUnit.MILLISECONDS)); + assertEquals(110010, dict.getMaxId()); + try { + builder.addValue("fail"); + fail("Builder should be closed"); + } catch (Exception e) {} + + builder = AppendTrieDictionary.Builder.getInstance(resourcePath, dict); + builder.addValue("success"); + dict = builder.build(0); + for (int i = 0; i < 10000; i ++) { + assertNotEquals(-1, dict.getIdFromValue("t1_" + i)); + } + for (int i = 0; i < 10; i ++) { + assertNotEquals(-1, dict.getIdFromValue("t2_" + i)); + } + for (int i = 0; i < 100000; i ++) { + assertNotEquals(-1, dict.getIdFromValue("t3_" + i)); + } + assertEquals(110011, dict.getIdFromValue("success")); + } +}