KYLIN-1851:unfinished add TrieDictionaryForest and NumDictionaryForest and SelfDefineSortableKey
Signed-off-by: Yang Li <liy...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f0804f95 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f0804f95 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f0804f95 Branch: refs/heads/KYLIN-2006 Commit: f0804f95ae59ef7adcc0e6e4fe9a3b3620586b96 Parents: ddec049 Author: xiefan46 <958034...@qq.com> Authored: Mon Nov 7 14:37:22 2016 +0800 Committer: Yang Li <liy...@apache.org> Committed: Tue Nov 8 23:23:34 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/dict/ByteComparator.java | 44 ++ .../kylin/dict/NumberDictionaryForest.java | 278 ++++++++ .../dict/NumberDictionaryForestBuilder.java | 58 ++ .../apache/kylin/dict/TrieDictionaryForest.java | 406 ++++++++++++ .../kylin/dict/TrieDictionaryForestBuilder.java | 125 ++++ .../kylin/dict/TrieDictionaryForestTest.java | 657 +++++++++++++++++++ .../fdc2/FactDistinctColumnPartitioner2.java | 47 ++ .../fdc2/FactDistinctColumnsCombiner2.java | 44 ++ .../mr/steps/fdc2/FactDistinctColumnsJob2.java | 149 +++++ .../fdc2/FactDistinctColumnsMapperBase2.java | 102 +++ .../fdc2/FactDistinctHiveColumnsMapper2.java | 232 +++++++ .../mr/steps/fdc2/SelfDefineSortableKey.java | 130 ++++ .../kylin/engine/mr/steps/fdc2/TypeFlag.java | 28 + .../mr/steps/NumberDictionaryForestTest.java | 214 ++++++ .../mr/steps/SelfDefineSortableKeyTest.java | 228 +++++++ 15 files changed, 2742 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/core-dictionary/src/main/java/org/apache/kylin/dict/ByteComparator.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/ByteComparator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/ByteComparator.java new file mode 100644 index 0000000..74d5ec5 --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/ByteComparator.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.dict; + +import org.apache.kylin.common.util.ByteArray; + +import java.util.Comparator; + +/** + * Created by xiefan on 16-10-28. + */ +public class ByteComparator<T> implements Comparator<T> { + private BytesConverter<T> converter; + + public ByteComparator(BytesConverter<T> converter) { + this.converter = converter; + } + + @Override + public int compare(T o1, T o2) { + //return BytesUtil.safeCompareBytes(converter.convertToBytes(o1),converter.convertToBytes(o2)); + byte[] b1 = converter.convertToBytes(o1); + byte[] b2 = converter.convertToBytes(o2); + ByteArray ba1 = new ByteArray(b1, 0, b1.length); + ByteArray ba2 = new ByteArray(b2, 0, b2.length); + return ba1.compareTo(ba2); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForest.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForest.java b/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForest.java new file mode 100644 index 0000000..8caa4b6 --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForest.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.dict; + +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.Dictionary; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.PrintStream; + +/** + * Created by xiefan on 16-11-1. + * <p> + * notice:number dictionary forest currently could not handle + * very big or very small double and float value such as 4.9E-324 + */ +public class NumberDictionaryForest<T> extends Dictionary<T> { + + public static final int MAX_DIGITS_BEFORE_DECIMAL_POINT = 19; + + // encode a number into an order preserving byte sequence + // for positives -- padding '0' + // for negatives -- '-' sign, padding '9', invert digits, and terminate by ';' + static class NumberBytesCodec { + int maxDigitsBeforeDecimalPoint; + byte[] buf; + int bufOffset; + int bufLen; + + NumberBytesCodec(int maxDigitsBeforeDecimalPoint) { + this.maxDigitsBeforeDecimalPoint = maxDigitsBeforeDecimalPoint; + this.buf = new byte[maxDigitsBeforeDecimalPoint * 3]; + this.bufOffset = 0; + this.bufLen = 0; + } + + void encodeNumber(byte[] value, int offset, int len) { + if (len == 0) { + bufOffset = 0; + bufLen = 0; + return; + } + + if (len > buf.length) { + throw new IllegalArgumentException("Too many digits for NumberDictionary: " + Bytes.toString(value, offset, len) + ". Internal buffer is only " + buf.length + " bytes"); + } + + boolean negative = value[offset] == '-'; + + // terminate negative ';' + int start = buf.length - len; + int end = buf.length; + if (negative) { + start--; + end--; + buf[end] = ';'; + } + + // copy & find decimal point + int decimalPoint = end; + for (int i = start, j = offset; i < end; i++, j++) { + buf[i] = value[j]; + if (buf[i] == '.' && i < decimalPoint) { + decimalPoint = i; + } + } + // remove '-' sign + if (negative) { + start++; + } + + // prepend '0' + int nZeroPadding = maxDigitsBeforeDecimalPoint - (decimalPoint - start); + if (nZeroPadding < 0 || nZeroPadding + 1 > start) + throw new IllegalArgumentException("Too many digits for NumberDictionary: " + Bytes.toString(value, offset, len) + ". Expect " + maxDigitsBeforeDecimalPoint + " digits before decimal point at max."); + for (int i = 0; i < nZeroPadding; i++) { + buf[--start] = '0'; + } + + // consider negative + if (negative) { + buf[--start] = '-'; + for (int i = start + 1; i < buf.length; i++) { + int c = buf[i]; + if (c >= '0' && c <= '9') { + buf[i] = (byte) ('9' - (c - '0')); + } + } + } else { + buf[--start] = '0'; + } + + bufOffset = start; + bufLen = buf.length - start; + } + + int decodeNumber(byte[] returnValue, int offset) { + if (bufLen == 0) { + return 0; + } + + int in = bufOffset; + int end = bufOffset + bufLen; + int out = offset; + + // sign + boolean negative = buf[in] == '-'; + if (negative) { + returnValue[out++] = '-'; + in++; + end--; + } + + // remove padding + byte padding = (byte) (negative ? '9' : '0'); + for (; in < end; in++) { + if (buf[in] != padding) + break; + } + + // all paddings before '.', special case for '0' + if (in == end || !(buf[in] >= '0' && buf[in] <= '9')) { + returnValue[out++] = '0'; + } + + // copy the rest + if (negative) { + for (; in < end; in++, out++) { + int c = buf[in]; + if (c >= '0' && c <= '9') { + c = '9' - (c - '0'); + } + returnValue[out] = (byte) c; + } + } else { + System.arraycopy(buf, in, returnValue, out, end - in); + out += end - in; + } + + return out - offset; + } + } + + static ThreadLocal<NumberBytesCodec> localCodec = + new ThreadLocal<NumberBytesCodec>(); + + // ============================================================================ + + private TrieDictionaryForest<T> dict; + + private BytesConverter<T> converter; + + public NumberDictionaryForest() { + } + + public NumberDictionaryForest(TrieDictionaryForest<T> dict, BytesConverter<T> converter) { + this.dict = dict; + this.converter = converter; + } + + protected NumberBytesCodec getCodec() { + NumberBytesCodec codec = localCodec.get(); + if (codec == null) { + codec = new NumberBytesCodec(MAX_DIGITS_BEFORE_DECIMAL_POINT); + localCodec.set(codec); + } + return codec; + } + + @Override + public int getMinId() { + return dict.getMinId(); + } + + @Override + public int getMaxId() { + return dict.getMaxId(); + } + + @Override + public int getSizeOfId() { + return dict.getSizeOfId(); + } + + @Override + public int getSizeOfValue() { + return dict.getSizeOfValue(); + } + + @Override + public boolean contains(Dictionary<?> another) { + return dict.contains(another); + } + + @Override + protected int getIdFromValueImpl(T value, int roundingFlag) { + if (value == null) return -1; + byte[] data = converter.convertToBytes(value); + return getIdFromValueBytesImpl(data, 0, data.length, roundingFlag); + } + + @Override + protected int getIdFromValueBytesImpl(byte[] value, int offset, int len, int roundingFlag) { + NumberBytesCodec codec = getCodec(); + codec.encodeNumber(value, offset, len); + return this.dict.getIdFromValueBytesImpl(codec.buf, codec.bufOffset, codec.bufLen, roundingFlag); + } + + @Override + protected T getValueFromIdImpl(int id) { + byte[] data = getValueBytesFromIdImpl(id); + if (data == null) return null; + else return converter.convertFromBytes(data, 0, data.length); + } + + @Override + protected byte[] getValueBytesFromIdImpl(int id) { + NumberBytesCodec codec = getCodec(); + codec.bufOffset = 0; + byte[] buf = new byte[dict.getSizeOfValue()]; + codec.bufLen = getValueBytesFromIdImpl(id, buf, 0); + + if (codec.bufLen == buf.length) { + return buf; + } else { + byte[] result = new byte[codec.bufLen]; + System.arraycopy(buf, 0, result, 0, codec.bufLen); + return result; + } + } + + @Override + protected int getValueBytesFromIdImpl(int id, byte[] returnValue, int offset) { + NumberBytesCodec codec = getCodec(); + codec.bufOffset = 0; + codec.bufLen = this.dict.getValueBytesFromIdImpl(id, codec.buf, 0); + return codec.decodeNumber(returnValue, offset); + } + + @Override + public void dump(PrintStream out) { + dict.dump(out); + } + + @Override + public void write(DataOutput out) throws IOException { + dict.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + this.dict = new TrieDictionaryForest<>(); + dict.readFields(in); + this.converter = this.dict.getBytesConvert(); + } + + public BytesConverter<T> getConverter() { + return converter; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java new file mode 100644 index 0000000..5444bb7 --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.dict; + +import org.apache.kylin.common.util.Bytes; + +/** + * Created by xiefan on 16-11-2. + */ +public class NumberDictionaryForestBuilder<T> { + + private TrieDictionaryForestBuilder<T> trieBuilder; + + private BytesConverter<T> bytesConverter; + + private NumberDictionaryForest.NumberBytesCodec codec = new NumberDictionaryForest.NumberBytesCodec( + NumberDictionaryForest.MAX_DIGITS_BEFORE_DECIMAL_POINT); + + public NumberDictionaryForestBuilder(BytesConverter<T> bytesConverter) { + this(bytesConverter, 0); + } + + public NumberDictionaryForestBuilder(BytesConverter<T> bytesConverter, int baseId) { + this.trieBuilder = new TrieDictionaryForestBuilder<T>(bytesConverter, baseId); + this.bytesConverter = bytesConverter; + } + + public void addValue(T value) { + addValue(bytesConverter.convertToBytes(value)); + } + + public void addValue(byte[] value) { + codec.encodeNumber(value, 0, value.length); + byte[] copy = Bytes.copy(codec.buf, codec.bufOffset, codec.bufLen); + this.trieBuilder.addValue(copy); + } + + public NumberDictionaryForest<T> build() { + TrieDictionaryForest<T> forest = trieBuilder.build(); + return new NumberDictionaryForest<T>(forest, bytesConverter); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForest.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForest.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForest.java new file mode 100755 index 0000000..e9ccc56 --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForest.java @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.dict; + + +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.Dictionary; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + + +/** + * use trie forest to optimize trie dictionary + * <p> + * the input data must in an increase order(sort by org.apache.kylin.dict.ByteComparator) + * <p> + * Created by xiefan on 16-10-26. + */ +public class TrieDictionaryForest<T> extends Dictionary<T> { + + private static final Logger logger = LoggerFactory.getLogger(TrieDictionaryForest.class); + + private ArrayList<TrieDictionary<T>> trees; + + //private ArrayList<byte[]> valueDivide; //find tree + + private ArrayList<ByteArray> valueDivide; + + private ArrayList<Integer> accuOffset; //find tree + + private BytesConverter<T> bytesConvert; + + private int baseId; + + /*public AtomicLong getValueIndexTime = new AtomicLong(0); + + public AtomicLong getValueTime = new AtomicLong(0); + + public AtomicLong binarySearchTime = new AtomicLong(0); + + public AtomicLong copyTime = new AtomicLong(0); + + public AtomicLong getValueIndexTime2 = new AtomicLong(0); + + public AtomicLong getValueTime2 = new AtomicLong(0);*/ + + public TrieDictionaryForest() { // default constructor for Writable interface + + } + + public TrieDictionaryForest(ArrayList<TrieDictionary<T>> trees, + ArrayList<ByteArray> valueDivide, ArrayList<Integer> accuOffset, BytesConverter<T> bytesConverter, int baseId) { + this.trees = trees; + this.valueDivide = valueDivide; + this.accuOffset = accuOffset; + this.bytesConvert = bytesConverter; + this.baseId = baseId; + } + + + @Override + public int getMinId() { + if (trees.isEmpty()) return -1; + return trees.get(0).getMinId() + baseId; + } + + @Override + public int getMaxId() { + if (trees.isEmpty()) return -1; + int index = trees.size() - 1; + int id = accuOffset.get(index) + trees.get(index).getMaxId() + baseId; + return id; + } + + @Override + public int getSizeOfId() { + if (trees.isEmpty()) return -1; + int maxOffset = accuOffset.get(accuOffset.size() - 1); + TrieDictionary<T> lastTree = trees.get(trees.size() - 1); + int sizeOfId = BytesUtil.sizeForValue(baseId + maxOffset + lastTree.getMaxId() + 1); + return sizeOfId; + } + + @Override + public int getSizeOfValue() { + int maxValue = -1; + for (TrieDictionary<T> tree : trees) + maxValue = Math.max(maxValue, tree.getSizeOfValue()); + return maxValue; + } + + //value --> id + @Override + protected int getIdFromValueImpl(T value, int roundingFlag) + throws IllegalArgumentException { + byte[] valueBytes = bytesConvert.convertToBytes(value); + return getIdFromValueBytesImpl(valueBytes, 0, valueBytes.length, roundingFlag); + } + + + //id = tree_inner_offset + accumulate_offset + baseId + @Override + protected int getIdFromValueBytesImpl(byte[] value, int offset, int len, int roundingFlag) + throws IllegalArgumentException { + + //long startTime = System.currentTimeMillis(); + ByteArray search = new ByteArray(value, offset, len); + //copyTime.addAndGet(System.currentTimeMillis() - startTime); + int index = findIndexByValue(search); + //int index = findIndexByValue(value); + //binarySearchTime.addAndGet(System.currentTimeMillis() - startTime); + if (index < 0) { + //System.out.println("value divide:"+valueDivide.size()+" "+valueDivide); + throw new IllegalArgumentException("Tree Not Found. index < 0.Value:" + new String(Arrays.copyOfRange(value, offset, len))); + } + TrieDictionary<T> tree = trees.get(index); + //getValueIndexTime.addAndGet(System.currentTimeMillis() - startTime); + //startTime = System.currentTimeMillis(); + int id = tree.getIdFromValueBytes(value, offset, len, roundingFlag); + id = id + accuOffset.get(index); + id += baseId; + //getValueTime.addAndGet(System.currentTimeMillis() - startTime); + return id; + } + + //id --> value + @Override + protected T getValueFromIdImpl(int id) throws IllegalArgumentException { + //System.out.println("here"); + byte[] data = getValueBytesFromIdImpl(id); + if (data != null) { + return bytesConvert.convertFromBytes(data, 0, data.length); + } else { + return null; + } + } + + @Override + protected int getValueBytesFromIdImpl(int id, byte[] returnValue, int offset) + throws IllegalArgumentException { + //long startTime = System.currentTimeMillis(); + int index = findIndexById(id); + int treeInnerOffset = getTreeInnerOffset(id, index); + TrieDictionary<T> tree = trees.get(index); + //getValueIndexTime2.addAndGet(System.currentTimeMillis() - startTime); + //startTime = System.currentTimeMillis(); + int size = tree.getValueBytesFromIdImpl(treeInnerOffset, returnValue, offset); + //getValueTime2.addAndGet(System.currentTimeMillis() - startTime); + return size; + } + + + @Override + protected byte[] getValueBytesFromIdImpl(int id) throws IllegalArgumentException { + int index = findIndexById(id); //lower bound + if (index < 0) { + throw new IllegalArgumentException("Tree Not Found. index < 0"); + } + int treeInnerOffset = getTreeInnerOffset(id, index); + TrieDictionary<T> tree = trees.get(index); + byte[] result = tree.getValueBytesFromId(treeInnerOffset); + return result; + } + + + private int getTreeInnerOffset(int id, int index) { + id -= baseId; + id = id - accuOffset.get(index); + return id; + } + + @Override + public void dump(PrintStream out) { + for (int i = 0; i < trees.size(); i++) { + System.out.println("----tree " + i + "--------"); + trees.get(i).dump(out); + } + } + + @Override + public void write(DataOutput out) throws IOException { + writeHead(out); + writeBody(out); + } + + private void writeHead(DataOutput out) throws IOException { + ByteArrayOutputStream byteBuf = new ByteArrayOutputStream(); + DataOutputStream headOut = new DataOutputStream(byteBuf); + headOut.writeInt(baseId); + headOut.writeUTF(bytesConvert == null ? "" : bytesConvert.getClass().getName()); + //write accuOffset + headOut.writeInt(accuOffset.size()); + for (int i = 0; i < accuOffset.size(); i++) + headOut.writeInt(accuOffset.get(i)); + //write valueDivide + headOut.writeInt(valueDivide.size()); + for (int i = 0; i < valueDivide.size(); i++) { + ByteArray ba = valueDivide.get(i); + byte[] byteStr = ba.toBytes(); + headOut.writeInt(byteStr.length); + headOut.write(byteStr); + } + //write tree size + headOut.writeInt(trees.size()); + headOut.close(); + byte[] head = byteBuf.toByteArray(); + //output + out.writeInt(head.length); + out.write(head); + } + + + private void writeBody(DataOutput out) throws IOException { + for (int i = 0; i < trees.size(); i++) { + TrieDictionary<T> tree = trees.get(i); + tree.write(out); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + try { + int headSize = in.readInt(); + this.baseId = in.readInt(); + String converterName = in.readUTF(); + if (converterName.isEmpty() == false) + this.bytesConvert = ClassUtil.forName(converterName, BytesConverter.class).newInstance(); + //init accuOffset + int accuSize = in.readInt(); + this.accuOffset = new ArrayList<>(); + for (int i = 0; i < accuSize; i++) { + accuOffset.add(in.readInt()); + } + //init valueDivide + int valueDivideSize = in.readInt(); + this.valueDivide = new ArrayList<>(); + for (int i = 0; i < valueDivideSize; i++) { + int length = in.readInt(); + byte[] buffer = new byte[length]; + in.readFully(buffer); + valueDivide.add(new ByteArray(buffer, 0, buffer.length)); + } + int treeSize = in.readInt(); + this.trees = new ArrayList<>(); + for (int i = 0; i < treeSize; i++) { + TrieDictionary<T> dict = new TrieDictionary<>(); + dict.readFields(in); + trees.add(dict); + } + } catch (Exception e) { + if (e instanceof RuntimeException) + throw (RuntimeException) e; + else + throw new RuntimeException(e); + } + + } + + @Override + public boolean contains(Dictionary other) { + if (other.getSize() > this.getSize()) { + return false; + } + + for (int i = other.getMinId(); i <= other.getMaxId(); ++i) { + T v = (T) other.getValueFromId(i); + if (!this.containsValue(v)) { + return false; + } + } + return true; + } + + public List<TrieDictionary<T>> getTrees() { + return Collections.unmodifiableList(this.trees); + } + + private boolean onlyOneTree() { + return trees.size() == 1; + } + + private int findIndexByValue(T value) { + byte[] valueBytes = bytesConvert.convertToBytes(value); + return findIndexByValue(new ByteArray(valueBytes, 0, valueBytes.length)); + } + + private int findIndexByValue(ByteArray value) { + int index = lowerBound(value, new Comparator<ByteArray>() { + @Override + public int compare(ByteArray o1, ByteArray o2) { + return o1.compareTo(o2); + } + }, this.valueDivide); + return index; + } + + private int findIndexById(Integer id) { + id -= baseId; + int index = lowerBound(id, new Comparator<Integer>() { + @Override + public int compare(Integer o1, Integer o2) { + return o1.compareTo(o2); + } + }, this.accuOffset); + return index; + } + + + private static <K> int lowerBound(K lookfor, Comparator<K> comparator, ArrayList<K> list) { + if (list == null || list.isEmpty()) + return 0; //return the first tree + int left = 0; + int right = list.size() - 1; + int mid = 0; + boolean found = false; + int comp = 0; + while (!found && left <= right) { + mid = left + (right - left) / 2; + comp = comparator.compare(lookfor, list.get(mid)); + if (comp < 0) + right = mid - 1; + else if (comp > 0) + left = mid + 1; + else + found = true; + } + if (found) { + //System.out.println("look for:"+lookfor+" index:"+mid); + return mid; + } else { + //System.out.println("look for:"+lookfor+" index:"+Math.max(left,right)); + return Math.min(left, right); //value may be bigger than the right tree + } + } + + public static void main(String[] args) { + /*ArrayList<Integer> list = new ArrayList<>(); + list.add(3); + list.add(10); + list.add(15); + Comparator<Integer> comp = new Comparator<Integer>() { + @Override + public int compare(Integer o1, Integer o2) { + return o1.compareTo(o2); + } + }; + int[] nums = {-1,0,1,2,3,4,13,15,16}; + for(int i : nums){ + System.out.println("found value:"+i+" index:"+lowerBound(i,comp,list)); + }*/ + ArrayList<String> list = new ArrayList<>(); + list.add("ä¸"); + list.add("äº"); + list.add("ä¸"); + list.add(""); + list.add("part"); + list.add("par"); + list.add("partition"); + list.add("party"); + list.add("parties"); + list.add("paint"); + Collections.sort(list); + for (String str : list) { + System.out.println("found value:" + str + " index:" + lowerBound(str, new Comparator<String>() { + @Override + public int compare(String o1, String o2) { + return o1.compareTo(o2); + } + }, list)); + } + //System.out.println(BytesUtil.safeCompareBytes("äº".getBytes(),"ä¸".getBytes())); + } + + public BytesConverter<T> getBytesConvert() { + return bytesConvert; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java new file mode 100755 index 0000000..3c03c08 --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.dict; + +import org.apache.kylin.common.util.ByteArray; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; + + +public class TrieDictionaryForestBuilder<T> { + + public static int MaxTrieTreeSize = 1024 * 1024;//1M + + private BytesConverter<T> bytesConverter; + + private int curTreeSize = 0; + + private TrieDictionaryBuilder<T> trieBuilder; + + private ArrayList<TrieDictionary<T>> trees = new ArrayList<>(); + + private ArrayList<ByteArray> valueDivide = new ArrayList<>(); //find tree + + private ArrayList<Integer> accuOffset = new ArrayList<>(); //find tree + + private ByteArray previousValue = null; //value use for remove duplicate + + private static final Logger logger = LoggerFactory.getLogger(TrieDictionaryForestBuilder.class); + + private int baseId; + + private int curOffset; + + + public TrieDictionaryForestBuilder(BytesConverter<T> bytesConverter) { + this(bytesConverter, 0); + } + + public TrieDictionaryForestBuilder(BytesConverter<T> bytesConverter, int baseId) { + this.bytesConverter = bytesConverter; + this.trieBuilder = new TrieDictionaryBuilder<T>(bytesConverter); + this.baseId = baseId; + curOffset = 0; + //stringComparator = new ByteComparator<>(new StringBytesConverter()); + } + + public void addValue(T value) { + if (value == null) return; + byte[] valueBytes = bytesConverter.convertToBytes(value); + addValue(new ByteArray(valueBytes, 0, valueBytes.length)); + } + + public void addValue(byte[] value) { + if (value == null) return; + ByteArray array = new ByteArray(value, 0, value.length); + addValue(array); + } + + public void addValue(ByteArray value) { + //System.out.println("value length:"+value.length); + if (value == null) return; + if (previousValue == null) { + previousValue = value; + } else { + int comp = previousValue.compareTo(value); + if (comp == 0) return; //duplicate value + if (comp > 0) { + //logger.info("values not in ascending order"); + //System.out.println("."); + } + } + this.trieBuilder.addValue(value.array()); + previousValue = value; + this.curTreeSize += value.length(); + if (curTreeSize >= MaxTrieTreeSize) { + TrieDictionary<T> tree = trieBuilder.build(0); + addTree(tree); + reset(); + } + } + + public TrieDictionaryForest<T> build() { + if (curTreeSize != 0) { //last tree + TrieDictionary<T> tree = trieBuilder.build(0); + addTree(tree); + reset(); + } + TrieDictionaryForest<T> forest = new TrieDictionaryForest<T>(this.trees, + this.valueDivide, this.accuOffset, this.bytesConverter, baseId); + return forest; + } + + private void addTree(TrieDictionary<T> tree) { + trees.add(tree); + int minId = tree.getMinId(); + accuOffset.add(curOffset); + byte[] valueBytes = tree.getValueBytesFromId(minId); + valueDivide.add(new ByteArray(valueBytes, 0, valueBytes.length)); + curOffset += (tree.getMaxId() + 1); + //System.out.println(" curOffset:"+ curOffset); + } + + private void reset() { + curTreeSize = 0; + trieBuilder = new TrieDictionaryBuilder<T>(bytesConverter); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/core-dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryForestTest.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryForestTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryForestTest.java new file mode 100755 index 0000000..624d6ba --- /dev/null +++ b/core-dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryForestTest.java @@ -0,0 +1,657 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + + +package org.apache.kylin.dict; + + +import org.apache.kylin.common.util.MemoryBudgetController; +import org.junit.Test; + +import java.io.*; +import java.util.*; + +import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; + +/** + * Created by xiefan on 16-10-26. + */ + +public class TrieDictionaryForestTest { + + + @Test + public void testBasicFound() { + ArrayList<String> strs = new ArrayList<String>(); + strs.add("part"); + strs.add("par"); + strs.add("partition"); + strs.add("party"); + strs.add("parties"); + strs.add("paint"); + Collections.sort(strs); + int baseId = 0; + TrieDictionaryForestBuilder<String> builder = newDictBuilder(strs, baseId); + TrieDictionaryForest<String> dict = builder.build(); + dict.dump(System.out); + int expectId = baseId; + for (String s : strs) { + System.out.println("value:" + s + " expect id:" + expectId); + assertEquals(expectId, dict.getIdFromValue(s)); + expectId++; + } + System.out.println("test ok"); + } + + @Test //one string one tree + public void testMultiTree() { + ArrayList<String> strs = new ArrayList<String>(); + strs.add("part"); + strs.add("par"); + strs.add("partition"); + strs.add("party"); + strs.add("parties"); + strs.add("paint"); + strs.add("ä¸äºä¸"); //Chinese test + strs.add("åäºå "); + strs.add(""); + Collections.sort(strs, new ByteComparator<String>(new StringBytesConverter())); + int baseId = 5; + int maxTreeSize = 0; + TrieDictionaryForestBuilder<String> builder = newDictBuilder(strs, baseId, maxTreeSize); + TrieDictionaryForest<String> dict = builder.build(); + dict.dump(System.out); + assertEquals(strs.size(), dict.getTrees().size()); + int expectId = baseId; + for (String s : strs) { + System.out.println("value:" + s + " expect id:" + expectId); + assertEquals(expectId, dict.getIdFromValue(s)); + expectId++; + } + System.out.println("test ok"); + } + + public void duplicateDataTest() { + //todo + } + + @Test + public void testBigDataSet() { + //h=generate data + ArrayList<String> strs = new ArrayList<>(); + Iterator<String> it = new RandomStrings(100 * 10000).iterator(); + int totalSize = 0; + final StringBytesConverter converter = new StringBytesConverter(); + while (it.hasNext()) { + String str = it.next(); + byte[] data = converter.convertToBytes(str); + if (data != null) { + totalSize += data.length; + } + strs.add(str); + } + Collections.sort(strs); + int baseId = 20; + int maxTreeSize = totalSize / 10; + System.out.println("data size:" + totalSize / 1024 + "KB max tree size:" + maxTreeSize / 1024 + "KB"); + //create the answer set + Map<String, Integer> idMap = rightIdMap(baseId, strs); + //build tree + TrieDictionaryForestBuilder<String> builder = newDictBuilder(strs, baseId, maxTreeSize); + TrieDictionaryForest<String> dict = builder.build(); + System.out.println("tree num:" + dict.getTrees().size()); + //check + for (Map.Entry<String, Integer> entry : idMap.entrySet()) { + //System.out.println("my id:"+dict.getIdFromValue(entry.getKey())+" right id:"+entry.getValue()); + assertEquals(0, dict.getIdFromValue(entry.getKey()) - entry.getValue()); + assertEquals(entry.getKey(), dict.getValueFromId(entry.getValue())); + } + } + + @Test + public void partOverflowTest() { + ArrayList<String> str = new ArrayList<String>(); + // str.add(""); + str.add("part"); + str.add("par"); + str.add("partition"); + str.add("party"); + str.add("parties"); + str.add("paint"); + String longStr = "paintjkjdfklajkdljfkdsajklfjklsadjkjekjrklewjrklewjklrjklewjkljkljkljkljweklrjewkljrklewjrlkjewkljrkljkljkjlkjjkljkljkljkljlkjlkjlkjljdfadfads" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk"; + System.out.println("The length of the long string is " + longStr.length()); + str.add(longStr); + + str.add("zzzzzz" + longStr);// another long string + int baseId = 10; + int maxSize = 100 * 1024 * 1024; + TrieDictionaryForestBuilder<String> b = newDictBuilder(str, baseId, maxSize); + TrieDictionaryForest<String> dict = b.build(); + TreeSet<String> set = new TreeSet<String>(); + for (String s : str) { + set.add(s); + } + // test basic id<==>value + Iterator<String> it = set.iterator(); + int id = 0; + int previousId = -1; + for (; it.hasNext(); id++) { + String value = it.next(); + + // in case of overflow parts, there exist interpolation nodes + // they exist to make sure that any node's part is shorter than 255 + int actualId = dict.getIdFromValue(value); + assertTrue(actualId >= id); + assertTrue(actualId > previousId); + previousId = actualId; + + assertEquals(value, dict.getValueFromId(actualId)); + } + } + + @Test + public void notFoundTest() { + ArrayList<String> str = new ArrayList<String>(); + str.add("part"); + str.add("par"); + str.add("partition"); + str.add("party"); + str.add("parties"); + str.add("paint"); + Collections.sort(str, new ByteComparator<String>(new StringBytesConverter())); + + ArrayList<String> notFound = new ArrayList<String>(); + notFound.add(""); + notFound.add("p"); + notFound.add("pa"); + notFound.add("pb"); + notFound.add("parti"); + notFound.add("partz"); + notFound.add("partyz"); + + testStringDictionary(str, notFound); + } + + + @Test + public void dictionaryContainTest() { + ArrayList<String> str = new ArrayList<String>(); + str.add("part"); + str.add("part"); // meant to be dup + str.add("par"); + str.add("partition"); + str.add("party"); + str.add("parties"); + str.add("paint"); + Collections.sort(str, new ByteComparator<String>(new StringBytesConverter())); + int baseId = new Random().nextInt(100); + TrieDictionaryForestBuilder<String> b = newDictBuilder(str, baseId); + TrieDictionaryForest<String> dict = b.build(); + str.add("py"); + Collections.sort(str, new ByteComparator<String>(new StringBytesConverter())); + b = newDictBuilder(str, baseId); + baseId = new Random().nextInt(100); + TrieDictionaryForest<String> dict2 = b.build(); + + assertEquals(true, dict2.contains(dict)); + assertEquals(false, dict.contains(dict2)); + } + + @Test + public void englishWordsTest() throws Exception { + InputStream is = new FileInputStream("src/test/resources/dict/english-words.80 (scowl-2015.05.18).txt"); + ArrayList<String> str = loadStrings(is); + Collections.sort(str, new ByteComparator<String>(new StringBytesConverter())); + testStringDictionary(str, null); + } + + @Test + public void categoryNamesTest() throws Exception { + InputStream is = new FileInputStream("src/test/resources/dict/dw_category_grouping_names.dat"); + ArrayList<String> str = loadStrings(is); + Collections.sort(str, new ByteComparator<String>(new StringBytesConverter())); + testStringDictionary(str, null); + } + + @Test + public void serializeTest() { + ArrayList<String> testData = getTestData(10); + TrieDictionaryForestBuilder<String> b = newDictBuilder(testData, 10, 0); + TrieDictionaryForest<String> dict = b.build(); + dict = testSerialize(dict); + dict.dump(System.out); + for (String str : testData) { + assertEquals(str, dict.getValueFromId(dict.getIdFromValue(str))); + } + } + + + private static TrieDictionaryForest<String> testSerialize(TrieDictionaryForest<String> dict) { + try { + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + DataOutputStream dataout = new DataOutputStream(bout); + dict.write(dataout); + dataout.close(); + ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray()); + DataInputStream datain = new DataInputStream(bin); + TrieDictionaryForest<String> r = new TrieDictionaryForest<>(); + //r.dump(System.out); + r.readFields(datain); + //r.dump(System.out); + datain.close(); + return r; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /*@Test + public void getIdFromValueBytesTest() throws Exception{ + String value = "ä¸äºä¸"; + BytesConverter<String> converter = new StringBytesConverter(); + TrieDictionaryForestBuilder<String> b = new TrieDictionaryForestBuilder<>(converter,0); + b.addValue(value); + TrieDictionaryForest<String> dict = b.build(); + dict.dump(System.out); + byte[] data = converter.convertToBytes(value); + int id = dict.getIdFromValueBytes(data,0,data.length); + + }*/ + + //benchmark + @Deprecated + public void memoryUsageBenchmarkTest() throws Exception { + //create data + ArrayList<String> testData = getTestData((int) (Integer.MAX_VALUE * 0.8 / 640)); + int testTimes = 1; + System.out.println("start memory:" + Runtime.getRuntime().maxMemory()); + System.out.println("start memory:" + Runtime.getRuntime().totalMemory()); + for (int i = 0; i < testTimes; i++) { + long start = MemoryBudgetController.gcAndGetSystemAvailMB(); + TrieDictionaryBuilder<String> b = new TrieDictionaryBuilder<>(new StringBytesConverter()); + for (String str : testData) + b.addValue(str); + long end = MemoryBudgetController.gcAndGetSystemAvailMB(); + System.out.println("object trie memory usage:" + (end - start) + "MB"); + System.out.println("start memory:" + Runtime.getRuntime().maxMemory()); + System.out.println("start memory:" + Runtime.getRuntime().totalMemory()); + /*System.out.println(b == null); + startMemUse = getSystemCurUsedMemory(); + TrieDictionary<String> dict = b.build(0); + memUse = getSystemCurUsedMemory(); + System.out.println("array trie memory usage:"+(memUse-startMemUse)/(1024*1024)+"MB"); + System.out.println(b == null ); + System.out.println(dict == null);*/ + } + + + } + + @Deprecated + private long getSystemCurUsedMemory() throws Exception { + System.gc(); + Thread.currentThread().sleep(1000); + long totalMem = Runtime.getRuntime().totalMemory(); + long useMem = totalMem - Runtime.getRuntime().freeMemory(); + return useMem; + } + + //@Test + public void buildTimeBenchmarkTest() throws Exception { + //create data + ArrayList<String> testData = getTestData((int) (Integer.MAX_VALUE * 0.8 / 640)); + //build time compare + int testTimes = 5; + long oldDictTotalBuildTime = 0; + long newDictTotalBuildTime = 0; + + //old dict + System.gc(); + Thread.currentThread().sleep(1000); + for (int i = 0; i < testTimes; i++) { + int keep = 0; + long startTime = System.currentTimeMillis(); + TrieDictionaryBuilder<String> oldTrieBuilder = new TrieDictionaryBuilder<>(new StringBytesConverter()); + for (String str : testData) + oldTrieBuilder.addValue(str); + TrieDictionary<String> oldDict = oldTrieBuilder.build(0); + keep |= oldDict.getIdFromValue(testData.get(0)); + oldDictTotalBuildTime += (System.currentTimeMillis() - startTime); + System.out.println("times:" + i); + } + + //new dict + System.gc(); + Thread.currentThread().sleep(1000); + for (int i = 0; i < testTimes; i++) { + int keep = 0; + long startTime = System.currentTimeMillis(); + BytesConverter<String> converter = new StringBytesConverter(); + TrieDictionaryForestBuilder<String> newTrieBuilder = new TrieDictionaryForestBuilder<String>(converter, 0); + for (String str : testData) + newTrieBuilder.addValue(str); + TrieDictionaryForest<String> newDict = newTrieBuilder.build(); + keep |= newDict.getIdFromValue(testData.get(0)); + newDictTotalBuildTime += (System.currentTimeMillis() - startTime); + System.out.println("times:" + i); + } + + + System.out.println("compare build time. Old trie : " + oldDictTotalBuildTime / 1000.0 + "s.New trie : " + newDictTotalBuildTime / 1000.0 + "s"); + } + + + @Test + public void queryTimeBenchmarkTest() throws Exception { + int count = (int) (Integer.MAX_VALUE * 0.8 / 640); + //int count = (int) (2); + benchmarkStringDictionary(new RandomStrings(count)); + } + + + private void evaluateDataSize(ArrayList<String> list) { + long size = 0; + for (String str : list) + size += str.getBytes().length; + System.out.println("test data size : " + size / (1024 * 1024) + " MB"); + } + + private void evaluateDataSize(int count) { + RandomStrings rs = new RandomStrings(count); + Iterator<String> itr = rs.iterator(); + long bytesCount = 0; + while (itr.hasNext()) + bytesCount += itr.next().getBytes().length; + System.out.println("test data size : " + bytesCount / (1024 * 1024) + " MB"); + } + + private static void benchmarkStringDictionary(Iterable<String> str) throws IOException { + //System.out.println("test values:"); + Iterator<String> itr = str.iterator(); + ArrayList<String> testData = new ArrayList<>(); + while (itr.hasNext()) + testData.add(itr.next()); + Collections.sort(testData); + TrieDictionaryForestBuilder<String> b = newDictBuilder(testData, 0); + TrieDictionaryForest<String> dict = b.build(); + System.out.println("tree size:" + dict.getTrees().size()); + BytesConverter<String> converter = new StringBytesConverter(); + TreeSet<String> set = new TreeSet<String>(); + for (String s : testData) { + set.add(s); + } + //System.out.println("print set"); + //System.out.println(set); + //dict.dump(System.out); + // prepare id==>value array and value==>id map + HashMap<String, Integer> map = new HashMap<String, Integer>(); + String[] strArray = new String[set.size()]; + byte[][] array = new byte[set.size()][]; + Iterator<String> it = set.iterator(); + for (int id = 0; it.hasNext(); id++) { + String value = it.next(); + map.put(value, id); + strArray[id] = value; + //array[id] = value.getBytes("UTF-8"); + array[id] = converter.convertToBytes(value); + } + + + // System.out.println("Dict size in bytes: " + + //MemoryUtil.deepMemoryUsageOf(dict)); + // System.out.println("Map size in bytes: " + + // MemoryUtil.deepMemoryUsageOf(map)); + // System.out.println("Array size in bytes: " + + // MemoryUtil.deepMemoryUsageOf(strArray)); + + // warm-up, said that code only got JIT after run 1k-10k times, + // following jvm options may help + // -XX:CompileThreshold=1500 + // -XX:+PrintCompilation + System.out.println("Benchmark awaitig..."); + benchmark("Warm up", dict, set, map, strArray, array); + benchmark("Benchmark", dict, set, map, strArray, array); + } + + private static int benchmark(String msg, TrieDictionaryForest<String> dict, TreeSet<String> set, HashMap<String, Integer> map, String[] strArray, byte[][] array) { + int n = set.size(); + int times = Math.max(10 * 1000 * 1000 / n, 1); // run 10 million lookups + int keep = 0; // make sure JIT don't OPT OUT function calls under test + byte[] valueBytes = new byte[dict.getSizeOfValue()]; + long start; + + // benchmark value==>id, via HashMap + System.out.println(msg + " HashMap lookup value==>id"); + start = System.currentTimeMillis(); + for (int i = 0; i < times; i++) { + for (int j = 0; j < n; j++) { + keep |= map.get(strArray[j]); + } + } + long timeValueToIdByMap = System.currentTimeMillis() - start; + System.out.println(timeValueToIdByMap); + + // benchmark value==>id, via Dict + System.out.println(msg + " Dictionary lookup value==>id"); + //dict.dump(System.out); + + start = System.currentTimeMillis(); + for (int i = 0; i < times; i++) { + for (int j = 0; j < n; j++) { + //System.out.println("looking for value:"+new String(array[j])); + keep |= dict.getIdFromValueBytes(array[j], 0, array[j].length); + } + } + long timeValueToIdByDict = System.currentTimeMillis() - start; + System.out.println(timeValueToIdByDict); + /*System.out.println("detail time. get index time"+dict.getValueIndexTime.get()+" get value time"+ + dict.getValueTime.get() +" binary search time:"+dict.binarySearchTime.get() + " copy time:"+ + dict.copyTime.get());*/ + + // benchmark id==>value, via Array + System.out.println(msg + " Array lookup id==>value"); + start = System.currentTimeMillis(); + for (int i = 0; i < times; i++) { + for (int j = 0; j < n; j++) { + keep |= strArray[j].length(); + } + } + long timeIdToValueByArray = System.currentTimeMillis() - start; + System.out.println(timeIdToValueByArray); + + // benchmark id==>value, via Dict + System.out.println(msg + " Dictionary lookup id==>value"); + start = System.currentTimeMillis(); + for (int i = 0; i < times; i++) { + for (int j = 0; j < n; j++) { + keep |= dict.getValueBytesFromId(j, valueBytes, 0); + } + } + long timeIdToValueByDict = System.currentTimeMillis() - start; + System.out.println(timeIdToValueByDict); + /*System.out.println("detail time. get index time"+dict.getValueIndexTime2.get()+" get value time"+ + dict.getValueTime2.get());*/ + + return keep; + } + + private static void testStringDictionary(ArrayList<String> str, ArrayList<String> notFound) { + int baseId = new Random().nextInt(100); + TrieDictionaryForestBuilder<String> b = newDictBuilder(str, baseId, 2); + TrieDictionaryForest<String> dict = b.build(); + //dict.dump(System.out); + TreeSet<String> set = new TreeSet<String>(); + for (String s : str) { + set.add(s); + } + + // test serialize + //dict = testSerialize(dict); + + // test basic id<==>value + Iterator<String> it = set.iterator(); + int id = baseId; + for (; it.hasNext(); id++) { + String value = it.next(); + // System.out.println("checking " + id + " <==> " + value); + + assertEquals(id, dict.getIdFromValue(value)); + assertEquals(value, dict.getValueFromId(id)); + } + + //test not found value + if (notFound != null) { + for (String s : notFound) { + try { + int nullId = dict.getIdFromValue(s); + System.out.println("null value id:" + nullId); + fail("For not found value '" + s + "', IllegalArgumentException is expected"); + } catch (IllegalArgumentException e) { + // good + } + } + } + int maxId = dict.getMaxId(); + int[] notExistIds = {-10, -20, -Integer.MIN_VALUE, -Integer.MAX_VALUE, maxId + 1, maxId + 2}; + for (Integer i : notExistIds) { + try { + dict.getValueFromId(i); + fail("For not found id '" + i + "', IllegalArgumentException is expected"); + } catch (IllegalArgumentException e) { + // good + } + } + + // test null value + int nullId = dict.getIdFromValue(null); + assertNull(dict.getValueFromId(nullId)); + int nullId2 = dict.getIdFromValueBytes(null, 0, 0); + assertEquals(dict.getValueBytesFromId(nullId2, null, 0), -1); + assertEquals(nullId, nullId2); + } + + private Map<String, Integer> rightIdMap(int baseId, ArrayList<String> strs) { + Map<String, Integer> result = new HashMap<>(); + int expectId = baseId; + for (String str : strs) { + result.put(str, expectId); + expectId++; + } + return result; + } + + private static TrieDictionaryForestBuilder<String> newDictBuilder(Iterable<String> strs, int baseId) { + TrieDictionaryForestBuilder<String> b = new TrieDictionaryForestBuilder<String>(new StringBytesConverter(), baseId); + for (String s : strs) + b.addValue(s); + return b; + } + + private static TrieDictionaryForestBuilder<String> newDictBuilder(Iterable<String> strs, int baseId, int treeSize) { + TrieDictionaryForestBuilder<String> b = new TrieDictionaryForestBuilder<String>(new StringBytesConverter(), baseId); + TrieDictionaryForestBuilder.MaxTrieTreeSize = treeSize; + for (String s : strs) + b.addValue(s); + return b; + } + + private static class RandomStrings implements Iterable<String> { + final private int size; + + public RandomStrings(int size) { + this.size = size; + //System.out.println("size = " + size); + } + + @Override + public Iterator<String> iterator() { + return new Iterator<String>() { + Random rand = new Random(System.currentTimeMillis()); + int i = 0; + + @Override + public boolean hasNext() { + return i < size; + } + + @Override + public String next() { + if (hasNext() == false) + throw new NoSuchElementException(); + + i++; + //if (i % 1000000 == 0) + //System.out.println(i); + + return nextString(); + } + + private String nextString() { + StringBuffer buf = new StringBuffer(); + for (int i = 0; i < 64; i++) { + int v = rand.nextInt(16); + char c; + if (v >= 0 && v <= 9) + c = (char) ('0' + v); + else + c = (char) ('a' + v - 10); + buf.append(c); + } + return buf.toString(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + } + + private static ArrayList<String> loadStrings(InputStream is) throws Exception { + ArrayList<String> r = new ArrayList<String>(); + BufferedReader reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); + try { + String word; + while ((word = reader.readLine()) != null) { + word = word.trim(); + if (word.isEmpty() == false) + r.add(word); + } + } finally { + reader.close(); + is.close(); + } + return r; + } + + + private ArrayList<String> getTestData(int count) { + RandomStrings rs = new RandomStrings(count); + Iterator<String> itr = rs.iterator(); + ArrayList<String> testData = new ArrayList<>(); + while (itr.hasNext()) + testData.add(itr.next()); + Collections.sort(testData, new ByteComparator<String>(new StringBytesConverter())); + evaluateDataSize(testData); + return testData; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnPartitioner2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnPartitioner2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnPartitioner2.java new file mode 100644 index 0000000..dfc6b2c --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnPartitioner2.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.steps.fdc2; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.kylin.common.util.BytesUtil; + +/** + */ +public class FactDistinctColumnPartitioner2 extends Partitioner<SelfDefineSortableKey, Text> { + private Configuration conf; + + @Override + public int getPartition(SelfDefineSortableKey key, Text value, int numReduceTasks) { + + if (key.getText().getBytes()[0] == FactDistinctHiveColumnsMapper2.MARK_FOR_HLL) { + // the last reducer is for merging hll + return numReduceTasks - 1; + } else if (key.getText().getBytes()[0] == FactDistinctHiveColumnsMapper2.MARK_FOR_PARTITION_COL) { + // the last reducer is for merging hll + return numReduceTasks - 2; + } else { + int colIndex = BytesUtil.readUnsigned(key.getText().getBytes(), 0, 1); + return colIndex; + } + + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsCombiner2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsCombiner2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsCombiner2.java new file mode 100644 index 0000000..6ff07f0 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsCombiner2.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps.fdc2; + +import org.apache.hadoop.io.Text; +import org.apache.kylin.engine.mr.KylinReducer; + +import java.io.IOException; + +/** + * @author yangli9 + */ +public class FactDistinctColumnsCombiner2 extends KylinReducer<SelfDefineSortableKey, Text, Text, Text> { + + @Override + protected void setup(Context context) throws IOException { + super.bindCurrentConfiguration(context.getConfiguration()); + } + + @Override + public void reduce(SelfDefineSortableKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException { + + // for hll, each key only has one output, no need to do local combine; + // for normal col, values are empty text + context.write(key.getText(), values.iterator().next()); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsJob2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsJob2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsJob2.java new file mode 100644 index 0000000..4d26402 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsJob2.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps.fdc2; + +import org.apache.commons.cli.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat; +import org.apache.kylin.engine.mr.MRUtil; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducer; +import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +/** + */ +public class FactDistinctColumnsJob2 extends AbstractHadoopJob { + protected static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsJob2.class); + + @Override + public int run(String[] args) throws Exception { + Options options = new Options(); + + try { + options.addOption(OPTION_JOB_NAME); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_CUBING_JOB_ID); + options.addOption(OPTION_OUTPUT_PATH); + options.addOption(OPTION_SEGMENT_ID); + options.addOption(OPTION_STATISTICS_ENABLED); + options.addOption(OPTION_STATISTICS_OUTPUT); + options.addOption(OPTION_STATISTICS_SAMPLING_PERCENT); + parseOptions(options, args); + + job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); + String job_id = getOptionValue(OPTION_CUBING_JOB_ID); + job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, job_id); + String cubeName = getOptionValue(OPTION_CUBE_NAME); + Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); + + String segmentID = getOptionValue(OPTION_SEGMENT_ID); + String statistics_enabled = getOptionValue(OPTION_STATISTICS_ENABLED); + String statistics_output = getOptionValue(OPTION_STATISTICS_OUTPUT); + String statistics_sampling_percent = getOptionValue(OPTION_STATISTICS_SAMPLING_PERCENT); + + // ---------------------------------------------------------------------------- + // add metadata to distributed cache + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubeInstance cube = cubeMgr.getCube(cubeName); + List<TblColRef> columnsNeedDict = cubeMgr.getAllDictColumnsOnFact(cube.getDescriptor()); + + job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); + job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); + job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, statistics_enabled); + job.getConfiguration().set(BatchConstants.CFG_STATISTICS_OUTPUT, statistics_output); + job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent); + logger.info("Starting: " + job.getJobName()); + logger.info("using FactDistinctColumnsJob2"); + + setJobClasspath(job, cube.getConfig()); + + CubeSegment segment = cube.getSegmentById(segmentID); + if (segment == null) { + logger.error("Failed to find {} in cube {}", segmentID, cube); + System.out.println("Failed to find {} in cube {} " + segmentID + "," + cube); + for (CubeSegment s : cube.getSegments()) { + logger.error(s.getName() + " with status " + s.getStatus()); + System.out.println(s.getName() + " with status " + s.getStatus()); + } + throw new IllegalStateException(); + } else { + logger.info("Found segment: " + segment); + System.out.println("Found segment " + segment); + } + setupMapper(cube.getSegmentById(segmentID)); + setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? columnsNeedDict.size() + 2 : columnsNeedDict.size()); + + attachKylinPropsAndMetadata(cube, job.getConfiguration()); + + return waitForCompletion(job); + + } finally { + if (job != null) + cleanupTempConfFile(job.getConfiguration()); + } + + } + + private void setupMapper(CubeSegment cubeSeg) throws IOException { + IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat(); + flatTableInputFormat.configureJob(job); + + job.setMapperClass(FactDistinctHiveColumnsMapper2.class); + job.setCombinerClass(FactDistinctColumnsCombiner2.class); + job.setMapOutputKeyClass(SelfDefineSortableKey.class); + job.setMapOutputValueClass(Text.class); + } + + private void setupReducer(Path output, int numberOfReducers) throws IOException { + job.setReducerClass(FactDistinctColumnsReducer.class); //reducer do not need to change + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(Text.class); + job.setPartitionerClass(FactDistinctColumnPartitioner2.class); + job.setNumReduceTasks(numberOfReducers); + + FileOutputFormat.setOutputPath(job, output); + job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString()); + + deletePath(job.getConfiguration(), output); + } + + public static void main(String[] args) throws Exception { + FactDistinctColumnsJob2 job = new FactDistinctColumnsJob2(); + int exitCode = ToolRunner.run(job, args); + System.exit(exitCode); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsMapperBase2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsMapperBase2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsMapperBase2.java new file mode 100644 index 0000000..2e9a2dc --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsMapperBase2.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.steps.fdc2; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; +import org.apache.kylin.engine.EngineFactory; +import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat; +import org.apache.kylin.engine.mr.KylinMapper; +import org.apache.kylin.engine.mr.MRUtil; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.metadata.model.TblColRef; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +/** + */ +public class FactDistinctColumnsMapperBase2<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, SelfDefineSortableKey, Text> { + + protected String cubeName; + protected CubeInstance cube; + protected CubeSegment cubeSeg; + protected CubeDesc cubeDesc; + protected long baseCuboidId; + protected List<TblColRef> factDictCols; + protected IMRTableInputFormat flatTableInputFormat; + + protected Text outputKey = new Text(); + protected SelfDefineSortableKey sortableKey = new SelfDefineSortableKey(); + protected Text outputValue = new Text(); + protected int errorRecordCounter = 0; + + protected CubeJoinedFlatTableEnrich intermediateTableDesc; + protected int[] dictionaryColumnIndex; + + @Override + protected void setup(Context context) throws IOException { + Configuration conf = context.getConfiguration(); + bindCurrentConfiguration(conf); + KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); + + cubeName = conf.get(BatchConstants.CFG_CUBE_NAME); + cube = CubeManager.getInstance(config).getCube(cubeName); + cubeSeg = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID)); + cubeDesc = cube.getDescriptor(); + baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); + factDictCols = CubeManager.getInstance(config).getAllDictColumnsOnFact(cubeDesc); + + flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat(); + + intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc); + dictionaryColumnIndex = new int[factDictCols.size()]; + for (int i = 0; i < factDictCols.size(); i++) { + TblColRef colRef = factDictCols.get(i); + int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef); + dictionaryColumnIndex[i] = columnIndexOnFlatTbl; + } + + } + + protected void handleErrorRecord(String[] record, Exception ex) throws IOException { + + System.err.println("Insane record: " + Arrays.toString(record)); + ex.printStackTrace(System.err); + + errorRecordCounter++; + if (errorRecordCounter > BatchConstants.ERROR_RECORD_LOG_THRESHOLD) { + if (ex instanceof IOException) + throw (IOException) ex; + else if (ex instanceof RuntimeException) + throw (RuntimeException) ex; + else + throw new RuntimeException("", ex); + } + } +}