KYLIN-2506 Refactor Global Dictionary
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ce8b24f6 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ce8b24f6 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ce8b24f6 Branch: refs/heads/KYLIN-2506 Commit: ce8b24f60a01d03eb57a223dbae095e7ceb98c7f Parents: 1caf19a Author: kangkaisen <kangkai...@163.com> Authored: Mon Feb 20 21:06:44 2017 +0800 Committer: kangkaisen <kangkai...@163.com> Committed: Thu Apr 13 16:20:57 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/dict/AppendTrieDictionary.java | 1197 +----------------- .../kylin/dict/AppendTrieDictionaryBuilder.java | 289 +++++ .../kylin/dict/AppendTrieDictionaryChecker.java | 9 +- .../org/apache/kylin/dict/CachedTreeMap.java | 481 ------- .../java/org/apache/kylin/dict/DictNode.java | 376 ++++++ .../java/org/apache/kylin/dict/DictSlice.java | 283 +++++ .../org/apache/kylin/dict/DictSliceKey.java | 75 ++ .../apache/kylin/dict/GlobalDictHDFSStore.java | 420 ++++++ .../apache/kylin/dict/GlobalDictMetadata.java | 50 + .../org/apache/kylin/dict/GlobalDictStore.java | 102 ++ .../kylin/dict/GlobalDictionaryBuilder.java | 12 +- .../kylin/dict/AppendTrieDictionaryTest.java | 329 ++++- .../apache/kylin/dict/CachedTreeMapTest.java | 378 ------ .../engine/spark/KylinKryoRegistrator.java | 3 - 14 files changed, 1946 insertions(+), 2058 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/ce8b24f6/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java index 962686d..ea216ba 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java @@ -15,1173 +15,125 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kylin.dict; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Dictionary; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.DataInput; -import java.io.DataInputStream; import java.io.DataOutput; -import java.io.DataOutputStream; import java.io.IOException; import java.io.PrintStream; -import java.io.UnsupportedEncodingException; -import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; -import java.util.IdentityHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.NavigableSet; import java.util.Objects; -import java.util.TreeMap; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.common.util.ClassUtil; -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.metadata.MetadataManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.concurrent.ExecutionException; +import static com.google.common.base.Preconditions.checkState; /** * A dictionary based on Trie data structure that maps enumerations of byte[] to * int IDs, used for global dictionary. - * - * Trie data is split into sub trees, called {@link DictSlice}, and stored in a {@link CachedTreeMap} with a configurable cache size. - * + * <p> + * Trie data is split into sub trees, called {@link DictSlice}. + * <p> * With Trie the memory footprint of the mapping is kinda minimized at the cost * CPU, if compared to HashMap of ID Arrays. Performance test shows Trie is * roughly 10 times slower, so there's a cache layer overlays on top of Trie and * gracefully fall back to Trie using a weak reference. - * + * <p> * The implementation is NOT thread-safe for now. - * + * <p> * TODO making it thread-safe * * @author sunyerui */ @SuppressWarnings({ "rawtypes", "unchecked", "serial" }) public class AppendTrieDictionary<T> extends CacheDictionary<T> { - public static final byte[] HEAD_MAGIC = new byte[] { 0x41, 0x70, 0x70, 0x65, 0x63, 0x64, 0x54, 0x72, 0x69, 0x65, 0x44, 0x69, 0x63, 0x74 }; // "AppendTrieDict" public static final int HEAD_SIZE_I = HEAD_MAGIC.length; - - public static final int BIT_IS_LAST_CHILD = 0x80; - public static final int BIT_IS_END_OF_VALUE = 0x40; - private static final Logger logger = LoggerFactory.getLogger(AppendTrieDictionary.class); transient private String baseDir; - transient private int maxId; - transient private int maxValueLength; - transient private int nValues; - - volatile private TreeMap<DictSliceKey, DictSlice> dictSliceMap; - - // Constructor both for build and deserialize - public AppendTrieDictionary() { - enableCache(); - } + transient private GlobalDictMetadata metadata; + transient private LoadingCache<DictSliceKey, DictSlice> dictCache; - public void initParams(String baseDir, int baseId, int maxId, int maxValueLength, int nValues, BytesConverter bytesConverter) throws IOException { + public void init(String baseDir) throws IOException { this.baseDir = baseDir; - this.baseId = baseId; - this.maxId = maxId; - this.maxValueLength = maxValueLength; - this.nValues = nValues; - this.bytesConvert = bytesConverter; - } - - public void initDictSliceMap(CachedTreeMap dictMap) throws IOException { - int maxVersions = KylinConfig.getInstanceFromEnv().getAppendDictMaxVersions(); - long versionTTL = KylinConfig.getInstanceFromEnv().getAppendDictVersionTTL(); - CachedTreeMap newDictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(1).baseDir(baseDir).immutable(true).maxVersions(maxVersions).versionTTL(versionTTL).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build(); - newDictSliceMap.loadEntry(dictMap); - this.dictSliceMap = newDictSliceMap; - } - - public byte[] writeDictMap() throws IOException { - ByteArrayOutputStream buf = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream(buf); - ((Writable) dictSliceMap).write(out); - byte[] dictMapBytes = buf.toByteArray(); - buf.close(); - out.close(); - - return dictMapBytes; - } - - // The dict id starts from 1 to 2147483647 and 2147483648 to -2, leave 0 and -1 used for uninitialized state - public static void checkValidId(int id) { - if (id == 0 || id == -1) { - throw new IllegalArgumentException("AppendTrieDictionary Id Overflow Unsigned Integer Size 4294967294"); - } - } - - public static class DictSliceKey implements WritableComparable, java.io.Serializable { - byte[] key; - - public static DictSliceKey wrap(byte[] key) { - DictSliceKey dictKey = new DictSliceKey(); - dictKey.key = key; - return dictKey; - } - - @Override - public String toString() { - return Bytes.toStringBinary(key); - } - - @Override - public int hashCode() { - return Arrays.hashCode(key); - } - - @Override - public int compareTo(Object o) { - if (!(o instanceof DictSliceKey)) { - return -1; - } - DictSliceKey other = (DictSliceKey) o; - return Bytes.compareTo(key, other.key); - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeInt(key.length); - out.write(key); - } - - @Override - public void readFields(DataInput in) throws IOException { - key = new byte[in.readInt()]; - in.readFully(key); - } - } - - public static class DictSlice<T> implements Writable, java.io.Serializable { - public DictSlice() { - } - - public DictSlice(byte[] trieBytes) { - init(trieBytes); - } - - private byte[] trieBytes; - - // non-persistent part - transient private int headSize; - transient private int bodyLen; - transient private int sizeChildOffset; - - transient private int nValues; - transient private int sizeOfId; - // mask MUST be long, since childOffset maybe 5 bytes at most - transient private long childOffsetMask; - transient private int firstByteOffset; - - private void init(byte[] trieBytes) { - this.trieBytes = trieBytes; - if (BytesUtil.compareBytes(HEAD_MAGIC, 0, trieBytes, 0, HEAD_MAGIC.length) != 0) - throw new IllegalArgumentException("Wrong file type (magic does not match)"); - - try { - DataInputStream headIn = new DataInputStream(new ByteArrayInputStream(trieBytes, HEAD_SIZE_I, trieBytes.length - HEAD_SIZE_I)); - this.headSize = headIn.readShort(); - this.bodyLen = headIn.readInt(); - this.nValues = headIn.readInt(); - this.sizeChildOffset = headIn.read(); - this.sizeOfId = headIn.read(); - - this.childOffsetMask = ~(((long) (BIT_IS_LAST_CHILD | BIT_IS_END_OF_VALUE)) << ((sizeChildOffset - 1) * 8)); - this.firstByteOffset = sizeChildOffset + 1; // the offset from begin of node to its first value byte - } catch (Exception e) { - if (e instanceof RuntimeException) - throw (RuntimeException) e; - else - throw new RuntimeException(e); - } - } - - public byte[] getFirstValue() { - int nodeOffset = headSize; - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - while (true) { - int valueLen = BytesUtil.readUnsigned(trieBytes, nodeOffset + firstByteOffset - 1, 1); - bytes.write(trieBytes, nodeOffset + firstByteOffset, valueLen); - if (checkFlag(nodeOffset, BIT_IS_END_OF_VALUE)) { - break; - } - nodeOffset = headSize + (int) (BytesUtil.readLong(trieBytes, nodeOffset, sizeChildOffset) & childOffsetMask); - if (nodeOffset == headSize) { - break; - } - } - return bytes.toByteArray(); - } - - /** - * returns a code point from [0, nValues), preserving order of value - * - * @param n - * -- the offset of current node - * @param inp - * -- input value bytes to lookup - * @param o - * -- offset in the input value bytes matched so far - * @param inpEnd - * -- end of input - * @param roundingFlag - * -- =0: return -1 if not found - * -- <0: return closest smaller if not found, return -1 - * -- >0: return closest bigger if not found, return nValues - */ - private int lookupSeqNoFromValue(int n, byte[] inp, int o, int inpEnd, int roundingFlag) { - while (true) { - // match the current node - int p = n + firstByteOffset; // start of node's value - int end = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1); // end of node's value - for (; p < end && o < inpEnd; p++, o++) { // note matching start from [0] - if (trieBytes[p] != inp[o]) { - return -1; // mismatch - } - } - - // node completely matched, is input all consumed? - boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE); - if (o == inpEnd) { - return p == end && isEndOfValue ? BytesUtil.readUnsigned(trieBytes, end, sizeOfId) : -1; - } - - // find a child to continue - int c = headSize + (int) (BytesUtil.readLong(trieBytes, n, sizeChildOffset) & childOffsetMask); - if (c == headSize) // has no children - return -1; - byte inpByte = inp[o]; - int comp; - while (true) { - p = c + firstByteOffset; - comp = BytesUtil.compareByteUnsigned(trieBytes[p], inpByte); - if (comp == 0) { // continue in the matching child, reset n and loop again - n = c; - break; - } else if (comp < 0) { // try next child - if (checkFlag(c, BIT_IS_LAST_CHILD)) - return -1; - c = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1) + (checkFlag(c, BIT_IS_END_OF_VALUE) ? sizeOfId : 0); - } else { // children are ordered by their first value byte - return -1; - } - } - } - } - - private boolean checkFlag(int offset, int bit) { - return (trieBytes[offset] & bit) > 0; - } - - public int getIdFromValueBytesImpl(byte[] value, int offset, int len, int roundingFlag) { - int id = lookupSeqNoFromValue(headSize, value, offset, offset + len, roundingFlag); - return id; - } - - private DictNode rebuildTrieTree() { - return rebuildTrieTreeR(headSize, null); - } - - private DictNode rebuildTrieTreeR(int n, DictNode parent) { - DictNode root = null; - while (true) { - int p = n + firstByteOffset; - int childOffset = (int) (BytesUtil.readLong(trieBytes, n, sizeChildOffset) & childOffsetMask); - int parLen = BytesUtil.readUnsigned(trieBytes, p - 1, 1); - boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE); - - byte[] value = new byte[parLen]; - System.arraycopy(trieBytes, p, value, 0, parLen); - - DictNode node = new DictNode(value, isEndOfValue); - if (isEndOfValue) { - int id = BytesUtil.readUnsigned(trieBytes, p + parLen, sizeOfId); - node.id = id; - } - - if (parent == null) { - root = node; - } else { - parent.addChild(node); - } - - if (childOffset != 0) { - rebuildTrieTreeR(childOffset + headSize, node); - } - - if (checkFlag(n, BIT_IS_LAST_CHILD)) { - break; - } else { - n += firstByteOffset + parLen + (isEndOfValue ? sizeOfId : 0); - } - } - return root; - } - - public boolean doCheck() { - int offset = headSize; - HashSet<Integer> parentSet = new HashSet<>(); - boolean lastChild = false; - - while (offset < trieBytes.length) { - if (lastChild) { - boolean contained = parentSet.remove(offset - headSize); - // Can't find parent, the data is corrupted - if (!contained) { - return false; - } - lastChild = false; - } - int p = offset + firstByteOffset; - int childOffset = (int) (BytesUtil.readLong(trieBytes, offset, sizeChildOffset) & childOffsetMask); - int parLen = BytesUtil.readUnsigned(trieBytes, p - 1, 1); - boolean isEndOfValue = checkFlag(offset, BIT_IS_END_OF_VALUE); - - // Copy value overflow, the data is corrupted - if (trieBytes.length < p + parLen) { - return false; - } - - // Check id is fine - if (isEndOfValue) { - BytesUtil.readUnsigned(trieBytes, p + parLen, sizeOfId); - } - - // Record it if has children - if (childOffset != 0) { - parentSet.add(childOffset); - } - - // brothers done, move to next parent - if (checkFlag(offset, BIT_IS_LAST_CHILD)) { - lastChild = true; - } - - // move to next node - offset += firstByteOffset + parLen + (isEndOfValue ? sizeOfId : 0); - } - - // ParentMap is empty, meaning all nodes has parent, the data is correct - return parentSet.isEmpty(); - } - - public void write(DataOutput out) throws IOException { - out.write(trieBytes); - } - - public void readFields(DataInput in) throws IOException { - byte[] headPartial = new byte[HEAD_MAGIC.length + Short.SIZE / Byte.SIZE + Integer.SIZE / Byte.SIZE]; - in.readFully(headPartial); - - if (BytesUtil.compareBytes(HEAD_MAGIC, 0, headPartial, 0, HEAD_MAGIC.length) != 0) - throw new IllegalArgumentException("Wrong file type (magic does not match)"); - - DataInputStream headIn = new DataInputStream(// - new ByteArrayInputStream(headPartial, HEAD_SIZE_I, headPartial.length - HEAD_SIZE_I)); - int headSize = headIn.readShort(); - int bodyLen = headIn.readInt(); - headIn.close(); - - byte[] all = new byte[headSize + bodyLen]; - System.arraycopy(headPartial, 0, all, 0, headPartial.length); - in.readFully(all, headPartial.length, all.length - headPartial.length); - - init(all); - } - - public static DictNode rebuildNodeByDeserialize(DataInput in) throws IOException { - DictSlice slice = new DictSlice(); - slice.readFields(in); - return slice.rebuildTrieTree(); - } - - @Override - public String toString() { - return String.format("DictSlice[firstValue=%s, values=%d, bytes=%d]", Bytes.toStringBinary(getFirstValue()), nValues, bodyLen); - } - - @Override - public int hashCode() { - return Arrays.hashCode(trieBytes); - } - - @Override - public boolean equals(Object o) { - if ((o instanceof AppendTrieDictionary.DictSlice) == false) { - logger.info("Equals return false because it's not DictInfo"); - return false; - } - DictSlice that = (DictSlice) o; - return Arrays.equals(this.trieBytes, that.trieBytes); - } - } - - public static class DictNode implements Writable, java.io.Serializable { - public byte[] part; - public int id = -1; - public boolean isEndOfValue; - public ArrayList<DictNode> children = new ArrayList<>(); - - public int nValuesBeneath; - public DictNode parent; - public int childrenCount = 1; - - public DictNode() { - } - - public void clone(DictNode o) { - this.part = o.part; - this.id = o.id; - this.isEndOfValue = o.isEndOfValue; - this.children = o.children; - for (DictNode child : o.children) { - child.parent = this; - } - this.nValuesBeneath = o.nValuesBeneath; - this.parent = o.parent; - this.childrenCount = o.childrenCount; - } - - DictNode(byte[] value, boolean isEndOfValue) { - reset(value, isEndOfValue); - } - - DictNode(byte[] value, boolean isEndOfValue, ArrayList<DictNode> children) { - reset(value, isEndOfValue, children); - } - - void reset(byte[] value, boolean isEndOfValue) { - reset(value, isEndOfValue, new ArrayList<DictNode>()); - } - - void reset(byte[] value, boolean isEndOfValue, ArrayList<DictNode> children) { - this.part = value; - this.isEndOfValue = isEndOfValue; - clearChild(); - for (DictNode child : children) { - addChild(child); - } - this.id = -1; - } - - void clearChild() { - this.children.clear(); - int childrenCountDelta = this.childrenCount - 1; - for (DictNode p = this; p != null; p = p.parent) { - p.childrenCount -= childrenCountDelta; - } - } - - void addChild(DictNode child) { - addChild(-1, child); - } - - void addChild(int index, DictNode child) { - child.parent = this; - if (index < 0) { - this.children.add(child); - } else { - this.children.add(index, child); - } - for (DictNode p = this; p != null; p = p.parent) { - p.childrenCount += child.childrenCount; - } - } - - public DictNode removeChild(int index) { - DictNode child = children.remove(index); - child.parent = null; - for (DictNode p = this; p != null; p = p.parent) { - p.childrenCount -= child.childrenCount; - } - return child; - } - - public DictNode duplicateNode() { - DictNode newChild = new DictNode(part, false); - newChild.parent = parent; - if (parent != null) { - int index = parent.children.indexOf(this); - parent.addChild(index + 1, newChild); - } - return newChild; - } - - public byte[] firstValue() { - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - DictNode p = this; - while (true) { - bytes.write(p.part, 0, p.part.length); - if (p.isEndOfValue || p.children.size() == 0) { - break; - } - p = p.children.get(0); - } - return bytes.toByteArray(); - } - - public static DictNode splitNodeTree(DictNode splitNode) { - if (splitNode == null) { - return null; - } - DictNode current = splitNode; - DictNode p = current.parent; - while (p != null) { - int index = p.children.indexOf(current); - assert index != -1; - DictNode newParent = p.duplicateNode(); - for (int i = p.children.size() - 1; i >= index; i--) { - DictNode child = p.removeChild(i); - newParent.addChild(0, child); - } - current = newParent; - p = p.parent; - } - return current; - } - - public static void mergeSingleByteNode(DictNode root, int leftOrRight) { - DictNode current = root; - DictNode child; - while (!current.children.isEmpty()) { - child = leftOrRight == 0 ? current.children.get(0) : current.children.get(current.children.size() - 1); - if (current.children.size() > 1 || current.isEndOfValue) { - current = child; - continue; - } - byte[] newValue = new byte[current.part.length + child.part.length]; - System.arraycopy(current.part, 0, newValue, 0, current.part.length); - System.arraycopy(child.part, 0, newValue, current.part.length, child.part.length); - current.reset(newValue, child.isEndOfValue, child.children); - current.id = child.id; - } - } - - @Override - public void write(DataOutput out) throws IOException { - byte[] bytes = buildTrieBytes(); - out.write(bytes); - } - - @Override - public void readFields(DataInput in) throws IOException { - DictNode root = DictSlice.rebuildNodeByDeserialize(in); - this.clone(root); - } - - protected byte[] buildTrieBytes() { - Stats stats = Stats.stats(this); - int sizeChildOffset = stats.mbpn_sizeChildOffset; - int sizeId = stats.mbpn_sizeId; - - // write head - byte[] head; - try { - ByteArrayOutputStream byteBuf = new ByteArrayOutputStream(); - DataOutputStream headOut = new DataOutputStream(byteBuf); - headOut.write(AppendTrieDictionary.HEAD_MAGIC); - headOut.writeShort(0); // head size, will back fill - headOut.writeInt(stats.mbpn_footprint); // body size - headOut.writeInt(stats.nValues); - headOut.write(sizeChildOffset); - headOut.write(sizeId); - headOut.close(); - head = byteBuf.toByteArray(); - BytesUtil.writeUnsigned(head.length, head, AppendTrieDictionary.HEAD_SIZE_I, 2); - } catch (IOException e) { - throw new RuntimeException(e); // shall not happen, as we are - } - - byte[] trieBytes = new byte[stats.mbpn_footprint + head.length]; - System.arraycopy(head, 0, trieBytes, 0, head.length); - - LinkedList<DictNode> open = new LinkedList<DictNode>(); - IdentityHashMap<DictNode, Integer> offsetMap = new IdentityHashMap<DictNode, Integer>(); - - // write body - int o = head.length; - offsetMap.put(this, o); - o = build_writeNode(this, o, true, sizeChildOffset, sizeId, trieBytes); - if (this.children.isEmpty() == false) - open.addLast(this); - - while (open.isEmpty() == false) { - DictNode parent = open.removeFirst(); - build_overwriteChildOffset(offsetMap.get(parent), o - head.length, sizeChildOffset, trieBytes); - for (int i = 0; i < parent.children.size(); i++) { - DictNode c = parent.children.get(i); - boolean isLastChild = (i == parent.children.size() - 1); - offsetMap.put(c, o); - o = build_writeNode(c, o, isLastChild, sizeChildOffset, sizeId, trieBytes); - if (c.children.isEmpty() == false) - open.addLast(c); - } - } - - if (o != trieBytes.length) - throw new RuntimeException(); - return trieBytes; - } - - private void build_overwriteChildOffset(int parentOffset, int childOffset, int sizeChildOffset, byte[] trieBytes) { - int flags = (int) trieBytes[parentOffset] & (TrieDictionary.BIT_IS_LAST_CHILD | TrieDictionary.BIT_IS_END_OF_VALUE); - BytesUtil.writeUnsigned(childOffset, trieBytes, parentOffset, sizeChildOffset); - trieBytes[parentOffset] |= flags; - } - - private int build_writeNode(DictNode n, int offset, boolean isLastChild, int sizeChildOffset, int sizeId, byte[] trieBytes) { - int o = offset; - - // childOffset - if (isLastChild) - trieBytes[o] |= TrieDictionary.BIT_IS_LAST_CHILD; - if (n.isEndOfValue) - trieBytes[o] |= TrieDictionary.BIT_IS_END_OF_VALUE; - o += sizeChildOffset; - - // nValueBytes - if (n.part.length > 255) - throw new RuntimeException("Value length is " + n.part.length + " and larger than 255: " + Bytes.toStringBinary(n.part)); - BytesUtil.writeUnsigned(n.part.length, trieBytes, o, 1); - o++; - - // valueBytes - System.arraycopy(n.part, 0, trieBytes, o, n.part.length); - o += n.part.length; - - if (n.isEndOfValue) { - checkValidId(n.id); - BytesUtil.writeUnsigned(n.id, trieBytes, o, sizeId); - o += sizeId; - } - - return o; - } - - @Override - public String toString() { - return String.format("DictNode[root=%s, nodes=%d, firstValue=%s]", Bytes.toStringBinary(part), childrenCount, Bytes.toStringBinary(firstValue())); - } - } - - public static class Stats { - public interface Visitor { - void visit(DictNode n, int level); - } - - private static void traverseR(DictNode node, Visitor visitor, int level) { - visitor.visit(node, level); - for (DictNode c : node.children) - traverseR(c, visitor, level + 1); - } - - private static void traversePostOrderR(DictNode node, Visitor visitor, int level) { - for (DictNode c : node.children) - traversePostOrderR(c, visitor, level + 1); - visitor.visit(node, level); - } - - public int nValues; // number of values in total - public int nValueBytesPlain; // number of bytes for all values - // uncompressed - public int nValueBytesCompressed; // number of values bytes in Trie - // (compressed) - public int maxValueLength; // size of longest value in bytes - - // the trie is multi-byte-per-node - public int mbpn_nNodes; // number of nodes in trie - public int mbpn_trieDepth; // depth of trie - public int mbpn_maxFanOut; // the maximum no. children - public int mbpn_nChildLookups; // number of child lookups during lookup - // every value once - public int mbpn_nTotalFanOut; // the sum of fan outs during lookup every - // value once - public int mbpn_sizeValueTotal; // the sum of value space in all nodes - public int mbpn_sizeNoValueBytes; // size of field noValueBytes - public int mbpn_sizeChildOffset; // size of field childOffset, points to - // first child in flattened array - public int mbpn_sizeId; // size of id value, always be 4 - public int mbpn_footprint; // MBPN footprint in bytes - - /** - * out print some statistics of the trie and the dictionary built from it - */ - public static Stats stats(DictNode root) { - // calculate nEndValueBeneath - traversePostOrderR(root, new Visitor() { - @Override - public void visit(DictNode n, int level) { - n.nValuesBeneath = n.isEndOfValue ? 1 : 0; - for (DictNode c : n.children) - n.nValuesBeneath += c.nValuesBeneath; - } - }, 0); - - // run stats - final Stats s = new Stats(); - final ArrayList<Integer> lenAtLvl = new ArrayList<Integer>(); - traverseR(root, new Visitor() { - @Override - public void visit(DictNode n, int level) { - if (n.isEndOfValue) - s.nValues++; - s.nValueBytesPlain += n.part.length * n.nValuesBeneath; - s.nValueBytesCompressed += n.part.length; - s.mbpn_nNodes++; - if (s.mbpn_trieDepth < level + 1) - s.mbpn_trieDepth = level + 1; - if (n.children.size() > 0) { - if (s.mbpn_maxFanOut < n.children.size()) - s.mbpn_maxFanOut = n.children.size(); - int childLookups = n.nValuesBeneath - (n.isEndOfValue ? 1 : 0); - s.mbpn_nChildLookups += childLookups; - s.mbpn_nTotalFanOut += childLookups * n.children.size(); - } - - if (level < lenAtLvl.size()) - lenAtLvl.set(level, n.part.length); - else - lenAtLvl.add(n.part.length); - int lenSoFar = 0; - for (int i = 0; i <= level; i++) - lenSoFar += lenAtLvl.get(i); - if (lenSoFar > s.maxValueLength) - s.maxValueLength = lenSoFar; - } - }, 0); - - // flatten trie footprint calculation, case of Multi-Byte-Per-DictNode - s.mbpn_sizeId = 4; - s.mbpn_sizeValueTotal = s.nValueBytesCompressed + s.nValues * s.mbpn_sizeId; - s.mbpn_sizeNoValueBytes = 1; - s.mbpn_sizeChildOffset = 5; - s.mbpn_footprint = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset); - while (true) { // minimize the offset size to match the footprint - int t = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset - 1); - // *4 because 2 MSB of offset is used for isEndOfValue & isEndChild flag - // expand t to long before *4, avoiding exceed Integer.MAX_VALUE - if (BytesUtil.sizeForValue((long) t * 4) <= s.mbpn_sizeChildOffset - 1) { - s.mbpn_sizeChildOffset--; - s.mbpn_footprint = t; - } else - break; - } - - return s; - } - - /** - * out print trie for debug - */ - public void print(DictNode root) { - print(root, System.out); - } - - public void print(DictNode root, final PrintStream out) { - traverseR(root, new Visitor() { - @Override - public void visit(DictNode n, int level) { - try { - for (int i = 0; i < level; i++) - out.print(" "); - out.print(new String(n.part, "UTF-8")); - out.print(" - "); - if (n.nValuesBeneath > 0) - out.print(n.nValuesBeneath); - if (n.isEndOfValue) - out.print("* [" + n.id + "]"); - out.print("\n"); - } catch (UnsupportedEncodingException e) { - e.printStackTrace(); - } - } - }, 0); - } - } - - public static class Builder<T> { - private static ConcurrentHashMap<String, Pair<Integer, Builder>> builderInstanceAndCountMap = new ConcurrentHashMap(); - - public static Builder getInstance(String resourcePath) throws IOException { - return getInstance(resourcePath, null); - } - - public synchronized static Builder getInstance(String resourcePath, AppendTrieDictionary dict) throws IOException { - Pair<Integer, Builder> entry = builderInstanceAndCountMap.get(resourcePath); - if (entry == null) { - entry = new Pair<>(0, createNewBuilder(resourcePath, dict)); - builderInstanceAndCountMap.put(resourcePath, entry); - } - entry.setFirst(entry.getFirst() + 1); - return entry.getSecond(); - } - - // return true if entry still in map - private synchronized static boolean releaseInstance(String resourcePath) { - Pair<Integer, Builder> entry = builderInstanceAndCountMap.get(resourcePath); - if (entry != null) { - entry.setFirst(entry.getFirst() - 1); - if (entry.getFirst() <= 0) { - builderInstanceAndCountMap.remove(resourcePath); - return false; - } - return true; - } - return false; - } - - public static Builder createNewBuilder(String resourcePath, AppendTrieDictionary existDict) throws IOException { - String dictDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict" + resourcePath + "/"; - - AppendTrieDictionary dictToUse = existDict; - if (dictToUse == null) { - // Try to load the existing dict from cache, making sure there's only the same one object in memory - NavigableSet<String> dicts = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getStore().listResources(resourcePath); - ArrayList<String> appendDicts = new ArrayList<>(); - if (dicts != null && !dicts.isEmpty()) { - for (String dict : dicts) { - DictionaryInfo info = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getStore().getResource(dict, DictionaryInfo.class, DictionaryInfoSerializer.INFO_SERIALIZER); - if (info.getDictionaryClass().equals(AppendTrieDictionary.class.getName())) { - appendDicts.add(dict); - } - } - } - if (appendDicts.isEmpty()) { - dictToUse = null; - } else if (appendDicts.size() == 1) { - dictToUse = (AppendTrieDictionary) DictionaryManager.getInstance(KylinConfig.getInstanceFromEnv()).getDictionary(appendDicts.get(0)); - } else { - throw new IllegalStateException(String.format("GlobalDict %s should have 0 or 1 append dict but %d", resourcePath, appendDicts.size())); - } - } - - AppendTrieDictionary.Builder<String> builder; - if (dictToUse == null) { - logger.info("GlobalDict {} is empty, create new one", resourcePath); - builder = new Builder<>(resourcePath, null, dictDir, 0, 0, 0, new StringBytesConverter(), null); - } else { - logger.info("GlobalDict {} exist, append value", resourcePath); - builder = new Builder<>(resourcePath, dictToUse, dictToUse.baseDir, dictToUse.maxId, dictToUse.maxValueLength, dictToUse.nValues, dictToUse.bytesConvert, dictToUse.writeDictMap()); - } - - return builder; - } - - private final String resourcePath; - private final String baseDir; - private int maxId; - private int maxValueLength; - private int nValues; - private final BytesConverter<T> bytesConverter; - - private final AppendTrieDictionary dict; - - private final TreeMap<DictSliceKey, DictNode> mutableDictSliceMap; - private int MAX_ENTRY_IN_SLICE = 10_000_000; - private static final double MAX_ENTRY_OVERHEAD_FACTOR = 1.0; - - private int processedCount = 0; - - // Constructor for a new Dict - private Builder(String resourcePath, AppendTrieDictionary dict, String baseDir, int maxId, int maxValueLength, int nValues, BytesConverter<T> bytesConverter, byte[] dictMapBytes) throws IOException { - this.resourcePath = resourcePath; - if (dict == null) { - this.dict = new AppendTrieDictionary<T>(); - } else { - this.dict = dict; - } - this.baseDir = baseDir; - this.maxId = maxId; - this.maxValueLength = maxValueLength; - this.nValues = nValues; - this.bytesConverter = bytesConverter; - - MAX_ENTRY_IN_SLICE = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize(); - int maxVersions = KylinConfig.getInstanceFromEnv().getAppendDictMaxVersions(); - long versionTTL = KylinConfig.getInstanceFromEnv().getAppendDictVersionTTL(); - // create a new cached map with baseDir - mutableDictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(1).baseDir(baseDir).maxVersions(maxVersions).versionTTL(versionTTL).keyClazz(DictSliceKey.class).valueClazz(DictNode.class).immutable(false).build(); - if (dictMapBytes != null) { - ((Writable) mutableDictSliceMap).readFields(new DataInputStream(new ByteArrayInputStream(dictMapBytes))); - } - } - - public void addValue(T value) { - addValue(bytesConverter.convertToBytes(value)); - } - - private synchronized void addValue(byte[] value) { - if (++processedCount % 1_000_000 == 0) { - logger.debug("add value count " + processedCount); - } - maxValueLength = Math.max(maxValueLength, value.length); - - if (mutableDictSliceMap.isEmpty()) { - DictNode root = new DictNode(new byte[0], false); - mutableDictSliceMap.put(DictSliceKey.wrap(new byte[0]), root); - } - DictSliceKey sliceKey = mutableDictSliceMap.floorKey(DictSliceKey.wrap(value)); - if (sliceKey == null) { - sliceKey = mutableDictSliceMap.firstKey(); - } - DictNode root = mutableDictSliceMap.get(sliceKey); - addValueR(root, value, 0); - if (root.childrenCount > MAX_ENTRY_IN_SLICE * MAX_ENTRY_OVERHEAD_FACTOR) { - mutableDictSliceMap.remove(sliceKey); - DictNode newRoot = splitNodeTree(root); - DictNode.mergeSingleByteNode(root, 1); - DictNode.mergeSingleByteNode(newRoot, 0); - mutableDictSliceMap.put(DictSliceKey.wrap(root.firstValue()), root); - mutableDictSliceMap.put(DictSliceKey.wrap(newRoot.firstValue()), newRoot); - } - } - - private DictNode splitNodeTree(DictNode root) { - DictNode parent = root; - DictNode splitNode; - int childCountToSplit = (int) (MAX_ENTRY_IN_SLICE * MAX_ENTRY_OVERHEAD_FACTOR / 2); - while (true) { - List<DictNode> children = parent.children; - if (children.size() == 0) { - splitNode = parent; - break; - } else if (children.size() == 1) { - parent = children.get(0); - continue; - } else { - for (int i = children.size() - 1; i >= 0; i--) { - parent = children.get(i); - if (childCountToSplit > children.get(i).childrenCount) { - childCountToSplit -= children.get(i).childrenCount; - } else { - childCountToSplit--; - break; - } - } - } - } - return DictNode.splitNodeTree(splitNode); - } - - private int createNextId() { - int id = ++maxId; - checkValidId(id); - nValues++; - return id; - } - - // Only used for test - public void setMaxId(int id) { - this.maxId = id; - } - - // When add a new node, the value part maybe over 255 bytes, need split it into a sub tree - private DictNode addNodeMaybeOverflow(byte[] value, int start, int end) { - DictNode head = null; - DictNode current = null; - for (; start + 255 < end; start += 255) { - DictNode c = new DictNode(BytesUtil.subarray(value, start, start + 255), false); - if (head == null) { - head = c; - current = c; - } else { - current.addChild(c); - current = c; - } - } - DictNode last = new DictNode(BytesUtil.subarray(value, start, end), true); - last.id = createNextId(); - if (head == null) { - head = last; - } else { - current.addChild(last); - } - return head; - } - - private void addValueR(DictNode node, byte[] value, int start) { - // match the value part of current node - int i = 0, j = start; - int n = node.part.length, nn = value.length; - int comp = 0; - for (; i < n && j < nn; i++, j++) { - comp = BytesUtil.compareByteUnsigned(node.part[i], value[j]); - if (comp != 0) - break; - } - - if (j == nn) { - // if value fully matched within the current node - if (i == n) { - // if equals to current node, just mark end of value - if (!node.isEndOfValue) { - // if the first match, assign an Id to nodt - node.id = createNextId(); - } - node.isEndOfValue = true; - } else { - // otherwise, split the current node into two - DictNode c = new DictNode(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children); - c.id = node.id; - node.reset(BytesUtil.subarray(node.part, 0, i), true); - node.addChild(c); - node.id = createNextId(); - } - return; - } - - // if partially matched the current, split the current node, add the new - // value, make a 3-way - if (i < n) { - DictNode c1 = new DictNode(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children); - c1.id = node.id; - DictNode c2 = addNodeMaybeOverflow(value, j, nn); - node.reset(BytesUtil.subarray(node.part, 0, i), false); - if (comp < 0) { - node.addChild(c1); - node.addChild(c2); - } else { - node.addChild(c2); - node.addChild(c1); - } - return; - } - - // out matched the current, binary search the next byte for a child node - // to continue - byte lookfor = value[j]; - int lo = 0; - int hi = node.children.size() - 1; - int mid = 0; - boolean found = false; - comp = 0; - while (!found && lo <= hi) { - mid = lo + (hi - lo) / 2; - DictNode c = node.children.get(mid); - comp = BytesUtil.compareByteUnsigned(lookfor, c.part[0]); - if (comp < 0) - hi = mid - 1; - else if (comp > 0) - lo = mid + 1; - else - found = true; - } - if (found) { - // found a child node matching the first byte, continue in that child - addValueR(node.children.get(mid), value, j); - } else { - // otherwise, make the value a new child - DictNode c = addNodeMaybeOverflow(value, j, nn); - node.addChild(comp <= 0 ? mid : mid + 1, c); - } - } - - public synchronized AppendTrieDictionary<T> build(int baseId) throws IOException { - boolean keepAppend = releaseInstance(resourcePath); - CachedTreeMap dictSliceMap = (CachedTreeMap) mutableDictSliceMap; - dict.initParams(baseDir, baseId, maxId, maxValueLength, nValues, bytesConverter); - dict.flushIndex(dictSliceMap, keepAppend); - dict.initDictSliceMap(dictSliceMap); - - return dict; - } + final GlobalDictStore globalDictStore = new GlobalDictHDFSStore(baseDir); + Long[] versions = globalDictStore.listAllVersions(); + checkState(versions.length > 0, "Global dict at %s is empty", baseDir); + final long latestVersion = versions[versions.length - 1]; + final Path latestVersionPath = globalDictStore.getVersionDir(latestVersion); + this.metadata = globalDictStore.getMetadata(latestVersion); + this.bytesConvert = metadata.bytesConverter; + this.dictCache = CacheBuilder.newBuilder().softValues().removalListener(new RemovalListener<DictSliceKey, DictSlice>() { + @Override + public void onRemoval(RemovalNotification<DictSliceKey, DictSlice> notification) { + logger.info("Evict slice with key {} and value {} caused by {}, size {}/{}", notification.getKey(), notification.getValue(), notification.getCause(), dictCache.size(), metadata.sliceFileMap.size()); + } + }).build(new CacheLoader<DictSliceKey, DictSlice>() { + @Override + public DictSlice load(DictSliceKey key) throws Exception { + DictSlice slice = globalDictStore.readSlice(latestVersionPath.toString(), metadata.sliceFileMap.get(key)); + logger.info("Load slice with key {} and value {}", key, slice); + return slice; + } + }); } @Override protected int getIdFromValueBytesWithoutCache(byte[] value, int offset, int len, int roundingFlag) { - if (dictSliceMap.isEmpty()) { - return -1; - } - byte[] tempVal = new byte[len]; - System.arraycopy(value, offset, tempVal, 0, len); - DictSliceKey sliceKey = dictSliceMap.floorKey(DictSliceKey.wrap(tempVal)); + byte[] val = Arrays.copyOfRange(value, offset, offset + len); + DictSliceKey sliceKey = metadata.sliceFileMap.floorKey(DictSliceKey.wrap(val)); if (sliceKey == null) { - sliceKey = dictSliceMap.firstKey(); + sliceKey = metadata.sliceFileMap.firstKey(); } - DictSlice slice = dictSliceMap.get(sliceKey); - int id = slice.getIdFromValueBytesImpl(value, offset, len, roundingFlag); - return id; - } - - @Override - protected byte[] getValueBytesFromIdWithoutCache(int id) { - throw new UnsupportedOperationException("AppendTrieDictionary can't retrive value from id"); + DictSlice slice; + try { + slice = dictCache.get(sliceKey); + } catch (ExecutionException e) { + throw new RuntimeException("Failed to load slice with key " + sliceKey, e.getCause()); + } + return slice.getIdFromValueBytesImpl(value, offset, len, roundingFlag); } @Override public int getMinId() { - return baseId; + return metadata.baseId; } @Override public int getMaxId() { - return maxId; + return metadata.maxId; } @Override public int getSizeOfId() { - return 4; + return Integer.SIZE / Byte.SIZE; } @Override public int getSizeOfValue() { - return maxValueLength; + return metadata.maxValueLength; } - public void flushIndex(CachedTreeMap dictSliceMap, boolean keepAppend) throws IOException { - try (FSDataOutputStream indexOut = dictSliceMap.openIndexOutput()) { - indexOut.writeInt(baseId); - indexOut.writeInt(maxId); - indexOut.writeInt(maxValueLength); - indexOut.writeInt(nValues); - indexOut.writeUTF(bytesConvert.getClass().getName()); - dictSliceMap.write(indexOut); - dictSliceMap.commit(keepAppend); - } + @Override + protected byte[] getValueBytesFromIdWithoutCache(int id) { + throw new UnsupportedOperationException("AppendTrieDictionary can't retrieve value from id"); } @Override public AppendTrieDictionary copyToAnotherMeta(KylinConfig srcConfig, KylinConfig dstConfig) throws IOException { - //copy appendDict - Path base = new Path(baseDir); - FileSystem srcFs = HadoopUtil.getFileSystem(base); - Path srcPath = CachedTreeMap.getLatestVersion(HadoopUtil.getCurrentConfiguration(), srcFs, base); - Path dstPath = new Path(srcPath.toString().replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory())); - logger.info("Copy appendDict from {} to {}", srcPath, dstPath); - - FileSystem dstFs = HadoopUtil.getFileSystem(dstPath); - if (dstFs.exists(dstPath)) { - logger.info("Delete existing AppendDict {}", dstPath); - dstFs.delete(dstPath, true); - } - FileUtil.copy(srcFs, srcPath, dstFs, dstPath, false, true, HadoopUtil.getCurrentConfiguration()); - - // init new AppendTrieDictionary + GlobalDictStore store = new GlobalDictHDFSStore(baseDir); + String dstBaseDir = store.copyToAnotherMeta(srcConfig, dstConfig); AppendTrieDictionary newDict = new AppendTrieDictionary(); - newDict.initParams(baseDir.replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory()), baseId, maxId, maxValueLength, nValues, bytesConvert); - newDict.initDictSliceMap((CachedTreeMap) dictSliceMap); - + newDict.init(dstBaseDir); return newDict; } @@ -1192,34 +144,12 @@ public class AppendTrieDictionary<T> extends CacheDictionary<T> { @Override public void readFields(DataInput in) throws IOException { - String baseDir = in.readUTF(); - Configuration conf = HadoopUtil.getCurrentConfiguration(); - try (FSDataInputStream input = CachedTreeMap.openLatestIndexInput(conf, baseDir)) { - int baseId = input.readInt(); - int maxId = input.readInt(); - int maxValueLength = input.readInt(); - int nValues = input.readInt(); - String converterName = input.readUTF(); - BytesConverter converter = null; - if (converterName.isEmpty() == false) { - try { - converter = ClassUtil.forName(converterName, BytesConverter.class).newInstance(); - } catch (Exception e) { - throw new IOException(e); - } - } - initParams(baseDir, baseId, maxId, maxValueLength, nValues, converter); - - // Create instance for deserialize data, and update to map in dict - CachedTreeMap dictMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir).immutable(true).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build(); - dictMap.readFields(input); - initDictSliceMap(dictMap); - } + init(in.readUTF()); } @Override public void dump(PrintStream out) { - out.println("Total " + nValues + " values, " + (dictSliceMap == null ? 0 : dictSliceMap.size()) + " slice"); + out.println(String.format("Total %d values and %d slices", metadata.nValues, metadata.sliceFileMap.size())); } @Override @@ -1248,5 +178,4 @@ public class AppendTrieDictionary<T> extends CacheDictionary<T> { public boolean contains(Dictionary other) { return false; } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/ce8b24f6/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java new file mode 100644 index 0000000..bfd664f --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.dict; + +import com.google.common.base.Preconditions; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.BytesUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.TreeMap; + +import static com.google.common.base.Preconditions.checkState; + +public class AppendTrieDictionaryBuilder { + private static final Logger logger = LoggerFactory.getLogger(AppendTrieDictionaryBuilder.class); + + private final String baseDir; + private final String workingDir; + private final int maxEntriesPerSlice; + + private GlobalDictStore store; + private int maxId; + private int maxValueLength; + private int nValues; + private BytesConverter bytesConverter; + private TreeMap<DictSliceKey, String> sliceFileMap = new TreeMap<>(); // slice key -> slice file name + private int counter; + + private DictSliceKey curKey; + private DictNode curNode; + + public AppendTrieDictionaryBuilder(String resourceDir, int maxEntriesPerSlice) throws IOException { + this.baseDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict" + resourceDir + "/"; + this.workingDir = this.baseDir + "/working"; + this.maxEntriesPerSlice = maxEntriesPerSlice; + init(); + } + + public synchronized void init() throws IOException { + this.store = new GlobalDictHDFSStore(baseDir); + store.prepareForWrite(workingDir); + + Long[] versions = store.listAllVersions(); + + if (versions.length == 0) { // build dict for the first time + this.maxId = 0; + this.maxValueLength = 0; + this.nValues = 0; + this.bytesConverter = new StringBytesConverter(); + + } else { // append values to last version + GlobalDictMetadata metadata = store.getMetadata(versions[versions.length - 1]); + this.maxId = metadata.maxId; + this.maxValueLength = metadata.maxValueLength; + this.nValues = metadata.nValues; + this.bytesConverter = metadata.bytesConverter; + this.sliceFileMap = new TreeMap<>(metadata.sliceFileMap); + } + } + + @SuppressWarnings("unchecked") + public void addValue(String value) { + if (counter++ > 0 && counter % 1_000_000 == 0) { + logger.info("processed {} values", counter); + } + + byte[] valueBytes = bytesConverter.convertToBytes(value); + + if (sliceFileMap.isEmpty()) { + curNode = new DictNode(new byte[0], false); + sliceFileMap.put(DictSliceKey.START_KEY, null); + } + checkState(sliceFileMap.firstKey().equals(DictSliceKey.START_KEY), "first key should be \"\", but got \"%s\"", sliceFileMap.firstKey()); + + DictSliceKey nextKey = sliceFileMap.floorKey(DictSliceKey.wrap(valueBytes)); + + if (curKey != null && !nextKey.equals(curKey)) { + // you may suppose nextKey>=curKey, but nextKey<curKey could happen when a node splits. + // for example, suppose we have curNode [1-10], and add value "2" triggers split: + // first half [1-5] is flushed out, make second half [6-10] curNode and "6" curKey. + // then value "3" is added, now we got nextKey "1" smaller than curKey "6", surprise! + // in this case, we need to flush [6-10] and read [1-5] back. + flushCurrentNode(); + curNode = null; + } + if (curNode == null) { // read next slice + DictSlice slice = store.readSlice(workingDir, sliceFileMap.get(nextKey)); + curNode = slice.rebuildTrieTree(); + } + curKey = nextKey; + + addValueR(curNode, valueBytes, 0); + + // split slice if it's too large + if (curNode.childrenCount > maxEntriesPerSlice) { + DictNode newRoot = splitNodeTree(curNode); + flushCurrentNode(); + curNode = newRoot; + curKey = DictSliceKey.wrap(newRoot.firstValue()); + sliceFileMap.put(curKey, null); + } + + maxValueLength = Math.max(maxValueLength, valueBytes.length); + } + + public AppendTrieDictionary build(int baseId) throws IOException { + if (curNode != null) { + flushCurrentNode(); + } + + GlobalDictMetadata metadata = new GlobalDictMetadata(baseId, this.maxId, this.maxValueLength, this.nValues, this.bytesConverter, sliceFileMap); + store.commit(workingDir, metadata); + + AppendTrieDictionary dict = new AppendTrieDictionary(); + dict.init(this.baseDir); + return dict; + } + + private void flushCurrentNode() { + String newSliceFile = store.writeSlice(workingDir, curKey, curNode); + String oldSliceFile = sliceFileMap.put(curKey, newSliceFile); + if (oldSliceFile != null) { + store.deleteSlice(workingDir, oldSliceFile); + } + } + + private void addValueR(DictNode node, byte[] value, int start) { + // match the value part of current node + int i = 0, j = start; + int n = node.part.length, nn = value.length; + int comp = 0; + for (; i < n && j < nn; i++, j++) { + comp = BytesUtil.compareByteUnsigned(node.part[i], value[j]); + if (comp != 0) + break; + } + + if (j == nn) { + // if value fully matched within the current node + if (i == n) { + // on first match, mark end of value and assign an ID + if (!node.isEndOfValue) { + node.id = createNextId(); + node.isEndOfValue = true; + } + } else { + // otherwise, split the current node into two + DictNode c = new DictNode(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children); + c.id = node.id; + node.reset(BytesUtil.subarray(node.part, 0, i), true); + node.addChild(c); + node.id = createNextId(); + } + return; + } + + // if partially matched the current, split the current node, add the new + // value, make a 3-way + if (i < n) { + DictNode c1 = new DictNode(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children); + c1.id = node.id; + DictNode c2 = addNodeMaybeOverflow(value, j, nn); + node.reset(BytesUtil.subarray(node.part, 0, i), false); + if (comp < 0) { + node.addChild(c1); + node.addChild(c2); + } else { + node.addChild(c2); + node.addChild(c1); + } + return; + } + + // out matched the current, binary search the next byte for a child node + // to continue + byte lookfor = value[j]; + int lo = 0; + int hi = node.children.size() - 1; + int mid = 0; + boolean found = false; + comp = 0; + while (!found && lo <= hi) { + mid = lo + (hi - lo) / 2; + DictNode c = node.children.get(mid); + comp = BytesUtil.compareByteUnsigned(lookfor, c.part[0]); + if (comp < 0) + hi = mid - 1; + else if (comp > 0) + lo = mid + 1; + else + found = true; + } + if (found) { + // found a child node matching the first byte, continue in that child + addValueR(node.children.get(mid), value, j); + } else { + // otherwise, make the value a new child + DictNode c = addNodeMaybeOverflow(value, j, nn); + node.addChild(comp <= 0 ? mid : mid + 1, c); + } + } + + private int createNextId() { + int id = ++maxId; + checkValidId(id); + nValues++; + return id; + } + + // The dict id starts from 1 to 2147483647 and 2147483648 to -2, leave 0 and -1 used for uninitialized state + private void checkValidId(int id) { + if (id == 0 || id == -1) { + throw new IllegalArgumentException("AppendTrieDictionary Id Overflow Unsigned Integer Size 4294967294"); + } + } + + // When add a new node, the value part maybe over 255 bytes, need split it into a sub tree + private DictNode addNodeMaybeOverflow(byte[] value, int start, int end) { + DictNode head = null; + DictNode current = null; + for (; start + 255 < end; start += 255) { + DictNode c = new DictNode(BytesUtil.subarray(value, start, start + 255), false); + if (head == null) { + head = c; + current = c; + } else { + current.addChild(c); + current = c; + } + } + DictNode last = new DictNode(BytesUtil.subarray(value, start, end), true); + last.id = createNextId(); + if (head == null) { + head = last; + } else { + current.addChild(last); + } + return head; + } + + private DictNode splitNodeTree(DictNode root) { + DictNode parent = root; + int childCountToSplit = (int) (maxEntriesPerSlice * 1.0 / 2); + while (true) { + List<DictNode> children = parent.children; + if (children.size() == 0) { + break; + } + if (children.size() == 1) { + parent = children.get(0); + } else { + for (int i = children.size() - 1; i >= 0; i--) { + parent = children.get(i); + if (childCountToSplit > children.get(i).childrenCount) { + childCountToSplit -= children.get(i).childrenCount; + } else { + childCountToSplit--; + break; + } + } + } + } + return DictNode.splitNodeTree(parent); + } + + // Only used for test + void setMaxId(int id) { + this.maxId = id; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/ce8b24f6/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java index 4b3817a..b7c39fa 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java @@ -28,6 +28,8 @@ import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.HadoopUtil; +import static org.apache.kylin.dict.GlobalDictHDFSStore.BUFFER_SIZE; + /** * Created by sunyerui on 16/11/15. */ @@ -67,16 +69,15 @@ public class AppendTrieDictionaryChecker { listDictSlicePath(fs, status, list); } } else { - if (path.getPath().getName().startsWith(CachedTreeMap.CACHED_PREFIX)) { + if (path.getPath().getName().startsWith(GlobalDictHDFSStore.IndexFormatV1.SLICE_PREFIX)) { list.add(path.getPath()); } } } public boolean doCheck(FileSystem fs, Path filePath) { - try (FSDataInputStream input = fs.open(filePath, CachedTreeMap.BUFFER_SIZE)) { - AppendTrieDictionary.DictSlice slice = new AppendTrieDictionary.DictSlice(); - slice.readFields(input); + try (FSDataInputStream input = fs.open(filePath, BUFFER_SIZE)) { + DictSlice slice = DictSlice.deserializeFrom(input); return slice.doCheck(); } catch (Exception e) { return false; http://git-wip-us.apache.org/repos/asf/kylin/blob/ce8b24f6/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java b/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java deleted file mode 100644 index ee69df7..0000000 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java +++ /dev/null @@ -1,481 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.dict; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.AbstractCollection; -import java.util.Collection; -import java.util.Iterator; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.ExecutionException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.kylin.common.util.HadoopUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; - -/** - * Created by sunyerui on 16/5/2. - * TODO Depends on HDFS for now, ideally just depends on storage interface - */ -public class CachedTreeMap<K extends WritableComparable, V extends Writable> extends TreeMap<K, V> implements Writable { - private static final Logger logger = LoggerFactory.getLogger(CachedTreeMap.class); - - private final Class<K> keyClazz; - private final Class<V> valueClazz; - transient volatile Collection<V> values; - private final LoadingCache<K, V> valueCache; - private final Configuration conf; - private final Path baseDir; - private final Path versionDir; - private final Path workingDir; - private final FileSystem fs; - private final boolean immutable; - private final int maxVersions; - private final long versionTTL; - private boolean keepAppend; - - public static final int BUFFER_SIZE = 8 * 1024 * 1024; - - public static final String CACHED_PREFIX = "cached_"; - public static final String VERSION_PREFIX = "version_"; - - public static class CachedTreeMapBuilder<K, V> { - private Class<K> keyClazz; - private Class<V> valueClazz; - private int maxCount = 8; - private String baseDir; - private boolean immutable; - private int maxVersions; - private long versionTTL; - - public static CachedTreeMapBuilder newBuilder() { - return new CachedTreeMapBuilder(); - } - - private CachedTreeMapBuilder() { - } - - public CachedTreeMapBuilder keyClazz(Class<K> clazz) { - this.keyClazz = clazz; - return this; - } - - public CachedTreeMapBuilder valueClazz(Class<V> clazz) { - this.valueClazz = clazz; - return this; - } - - public CachedTreeMapBuilder<K, V> maxSize(int maxCount) { - this.maxCount = maxCount; - return this; - } - - public CachedTreeMapBuilder<K, V> baseDir(String baseDir) { - this.baseDir = baseDir; - return this; - } - - public CachedTreeMapBuilder<K, V> immutable(boolean immutable) { - this.immutable = immutable; - return this; - } - - public CachedTreeMapBuilder<K, V> maxVersions(int maxVersions) { - this.maxVersions = maxVersions; - return this; - } - - public CachedTreeMapBuilder<K, V> versionTTL(long versionTTL) { - this.versionTTL = versionTTL; - return this; - } - - public CachedTreeMap build() throws IOException { - if (baseDir == null) { - throw new RuntimeException("CachedTreeMap need a baseDir to cache data"); - } - if (keyClazz == null || valueClazz == null) { - throw new RuntimeException("CachedTreeMap need key and value clazz to serialize data"); - } - CachedTreeMap map = new CachedTreeMap(maxCount, keyClazz, valueClazz, baseDir, immutable, maxVersions, versionTTL); - return map; - } - } - - private CachedTreeMap(int maxCount, Class<K> keyClazz, Class<V> valueClazz, String basePath, - boolean immutable, int maxVersions, long versionTTL) throws IOException { - super(); - this.keyClazz = keyClazz; - this.valueClazz = valueClazz; - this.immutable = immutable; - this.keepAppend = true; - this.maxVersions = maxVersions; - this.versionTTL = versionTTL; - this.conf = HadoopUtil.getCurrentConfiguration(); - if (basePath.endsWith("/")) { - basePath = basePath.substring(0, basePath.length()-1); - } - this.baseDir = new Path(basePath); - this.fs = HadoopUtil.getFileSystem(baseDir, conf); - if (!fs.exists(baseDir)) { - fs.mkdirs(baseDir); - } - this.versionDir = getLatestVersion(conf, fs, baseDir); - this.workingDir = new Path(baseDir, "working"); - if (!this.immutable) { - // For mutable map, copy all data into working dir and work on it, avoiding suddenly server crash made data corrupt - if (fs.exists(workingDir)) { - fs.delete(workingDir, true); - } - FileUtil.copy(fs, versionDir, fs, workingDir, false, true, conf); - } - CacheBuilder builder = CacheBuilder.newBuilder().removalListener(new RemovalListener<K, V>() { - @Override - public void onRemoval(RemovalNotification<K, V> notification) { - logger.info(String.format("Evict cache key %s(%d) with value %s caused by %s, size %d/%d ", notification.getKey(), notification.getKey().hashCode(), notification.getValue(), notification.getCause(), size(), valueCache.size())); - switch (notification.getCause()) { - case SIZE: - writeValue(notification.getKey(), notification.getValue()); - break; - case EXPLICIT: - deleteValue(notification.getKey()); - break; - default: - } - } - }); - if (this.immutable) { - // For immutable values, load all values as much as possible, and evict by soft reference to free memory when gc - builder.softValues(); - } else { - builder.maximumSize(maxCount); - } - this.valueCache = builder.build(new CacheLoader<K, V>() { - @Override - public V load(K key) throws Exception { - V value = readValue(key); - logger.info(String.format("Load cache by key %s(%d) with value %s", key, key.hashCode(), value)); - return value; - } - }); - } - - private String generateFileName(K key) { - String file = getCurrentDir() + "/" + CACHED_PREFIX + key.toString(); - return file; - } - - private String getCurrentDir() { - return immutable ? versionDir.toString() : workingDir.toString(); - } - - private static String[] listAllVersions(FileSystem fs, Path baseDir) throws IOException { - FileStatus[] fileStatus = fs.listStatus(baseDir, new PathFilter() { - @Override - public boolean accept(Path path) { - if (path.getName().startsWith(VERSION_PREFIX)) { - return true; - } - return false; - } - }); - TreeSet<String> versions = new TreeSet<>(); - for (FileStatus status : fileStatus) { - versions.add(status.getPath().toString()); - } - return versions.toArray(new String[versions.size()]); - } - - // only for test - public String getLatestVersion() throws IOException { - return getLatestVersion(conf, fs, baseDir).toUri().getPath(); - } - - public static Path getLatestVersion(Configuration conf, FileSystem fs, Path baseDir) throws IOException { - String[] versions = listAllVersions(fs, baseDir); - if (versions.length > 0) { - return new Path(versions[versions.length - 1]); - } else { - // Old format, directly use base dir, convert to new format - Path newVersionDir = new Path(baseDir, VERSION_PREFIX + System.currentTimeMillis()); - Path tmpNewVersionDir = new Path(baseDir, "tmp_" + VERSION_PREFIX + System.currentTimeMillis()); - Path indexFile = new Path(baseDir, ".index"); - FileStatus[] cachedFiles; - try { - cachedFiles = fs.listStatus(baseDir, new PathFilter() { - @Override - public boolean accept(Path path) { - if (path.getName().startsWith(CACHED_PREFIX)) { - return true; - } - return false; - } - }); - fs.mkdirs(tmpNewVersionDir); - if (fs.exists(indexFile) && cachedFiles.length > 0) { - FileUtil.copy(fs, indexFile, fs, tmpNewVersionDir, false, true, conf); - for (FileStatus file : cachedFiles) { - FileUtil.copy(fs, file.getPath(), fs, tmpNewVersionDir, false, true, conf); - } - } - fs.rename(tmpNewVersionDir, newVersionDir); - if (fs.exists(indexFile) && cachedFiles.length > 0) { - fs.delete(indexFile, true); - for (FileStatus file : cachedFiles) { - fs.delete(file.getPath(), true); - } - } - } finally { - if (fs.exists(tmpNewVersionDir)) { - fs.delete(tmpNewVersionDir, true); - } - } - return newVersionDir; - } - } - - public void commit(boolean keepAppend) throws IOException { - assert this.keepAppend && !immutable : "Only support commit method with immutable false and keepAppend true"; - - Path newVersionDir = new Path(baseDir, VERSION_PREFIX + System.currentTimeMillis()); - if (keepAppend) { - // Copy to tmp dir, and rename to new version, make sure it's complete when be visible - Path tmpNewVersionDir = new Path(baseDir, "tmp_" + VERSION_PREFIX + System.currentTimeMillis()); - try { - FileUtil.copy(fs, workingDir, fs, tmpNewVersionDir, false, true, conf); - fs.rename(tmpNewVersionDir, newVersionDir); - } finally { - if (fs.exists(tmpNewVersionDir)) { - fs.delete(tmpNewVersionDir, true); - } - } - } else { - fs.rename(workingDir, newVersionDir); - } - this.keepAppend = keepAppend; - - // Check versions count, delete expired versions - String[] versions = listAllVersions(fs, baseDir); - long timestamp = System.currentTimeMillis(); - for (int i = 0; i < versions.length - maxVersions; i++) { - String versionString = versions[i].substring(versions[i].lastIndexOf(VERSION_PREFIX) + VERSION_PREFIX.length()); - long version = Long.parseLong(versionString); - if (version + versionTTL < timestamp) { - fs.delete(new Path(versions[i]), true); - } - } - } - - public void loadEntry(CachedTreeMap other) { - for (Object key : other.keySet()) { - super.put((K)key, null); - } - } - - private void writeValue(K key, V value) { - if (immutable) { - return; - } - String fileName = generateFileName(key); - Path filePath = new Path(fileName); - try (FSDataOutputStream out = fs.create(filePath, true, BUFFER_SIZE, (short) 5, BUFFER_SIZE * 8L)) { - value.write(out); - } catch (Exception e) { - logger.error(String.format("write value into %s exception: %s", fileName, e), e); - throw new RuntimeException(e.getCause()); - } - } - - private V readValue(K key) throws Exception { - String fileName = generateFileName(key); - Path filePath = new Path(fileName); - try (FSDataInputStream input = fs.open(filePath, BUFFER_SIZE)) { - V value = valueClazz.newInstance(); - value.readFields(input); - return value; - } catch (Exception e) { - logger.error(String.format("read value from %s exception: %s", fileName, e), e); - return null; - } - } - - private void deleteValue(K key) { - if (immutable) { - return; - } - String fileName = generateFileName(key); - Path filePath = new Path(fileName); - try { - if (fs.exists(filePath)) { - fs.delete(filePath, true); - } - } catch (Exception e) { - logger.error(String.format("delete value file %s exception: %s", fileName, e), e); - } - } - - @Override - public V put(K key, V value) { - assert keepAppend && !immutable : "Only support put method with immutable false and keepAppend true"; - super.put(key, null); - valueCache.put(key, value); - return null; - } - - @Override - public V get(Object key) { - if (super.containsKey(key)) { - try { - return valueCache.get((K) key); - } catch (ExecutionException e) { - logger.error(String.format("get value with key %s exception: %s", key, e), e); - return null; - } - } else { - return null; - } - } - - @Override - public V remove(Object key) { - assert keepAppend && !immutable : "Only support remove method with immutable false keepAppend true"; - super.remove(key); - valueCache.invalidate(key); - return null; - } - - @Override - public void clear() { - super.clear(); - values = null; - valueCache.invalidateAll(); - } - - public Collection<V> values() { - Collection<V> vs = values; - return (vs != null) ? vs : (values = new Values()); - } - - class Values extends AbstractCollection<V> { - @Override - public Iterator<V> iterator() { - return new ValueIterator<>(); - } - - @Override - public int size() { - return CachedTreeMap.this.size(); - } - } - - class ValueIterator<V> implements Iterator<V> { - Iterator<K> keyIterator; - K currentKey; - - public ValueIterator() { - keyIterator = CachedTreeMap.this.keySet().iterator(); - } - - @Override - public boolean hasNext() { - return keyIterator.hasNext(); - } - - @Override - public V next() { - currentKey = keyIterator.next(); - try { - return (V) valueCache.get(currentKey); - } catch (ExecutionException e) { - logger.error(String.format("get value with key %s exception: %s", currentKey, e), e); - return null; - } - } - - @Override - public void remove() { - assert keepAppend && !immutable : "Only support remove method with immutable false and keepAppend true"; - keyIterator.remove(); - valueCache.invalidate(currentKey); - } - } - - public FSDataOutputStream openIndexOutput() throws IOException { - assert keepAppend && !immutable : "Only support write method with immutable false and keepAppend true"; - Path indexPath = new Path(getCurrentDir(), ".index"); - return fs.create(indexPath, true, 8 * 1024 * 1024, (short) 5, 8 * 1024 * 1024 * 8); - } - - public FSDataInputStream openIndexInput() throws IOException { - Path indexPath = new Path(getCurrentDir(), ".index"); - return fs.open(indexPath, 8 * 1024 * 1024); - } - - public static FSDataInputStream openLatestIndexInput(Configuration conf, String baseDir) throws IOException { - Path basePath = new Path(baseDir); - FileSystem fs = HadoopUtil.getFileSystem(basePath, conf); - Path indexPath = new Path(getLatestVersion(conf, fs, basePath), ".index"); - return fs.open(indexPath, 8 * 1024 * 1024); - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeInt(size()); - for (K key : keySet()) { - key.write(out); - V value = valueCache.getIfPresent(key); - if (null != value) { - writeValue(key, value); - } - } - } - - @Override - public void readFields(DataInput in) throws IOException { - int size = in.readInt(); - try { - for (int i = 0; i < size; i++) { - K key = keyClazz.newInstance(); - key.readFields(in); - super.put(key, null); - } - } catch (Exception e) { - throw new IOException(e); - } - } -}