http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java index cda3c2b..4b12fee 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java @@ -18,9 +18,19 @@ package org.apache.kylin.dict; +import java.io.Closeable; import java.io.IOException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.lock.DistributedLock; import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.dict.global.AppendTrieDictionaryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.MoreExecutors; /** * GlobalDictinary based on whole cube, to ensure one value has same dict id in different segments. @@ -28,29 +38,115 @@ import org.apache.kylin.common.util.Dictionary; * Created by sunyerui on 16/5/24. */ public class GlobalDictionaryBuilder implements IDictionaryBuilder { - AppendTrieDictionary.Builder<String> builder; - int baseId; + private AppendTrieDictionaryBuilder builder; + private int baseId; + + private DistributedLock lock; + private String sourceColumn; + //the job thread name is UUID+threadID + private final String jobUUID = Thread.currentThread().getName(); + private int counter; + + private static Logger logger = LoggerFactory.getLogger(GlobalDictionaryBuilder.class); @Override public void init(DictionaryInfo dictInfo, int baseId) throws IOException { if (dictInfo == null) { throw new IllegalArgumentException("GlobalDictinaryBuilder must used with an existing DictionaryInfo"); } - this.builder = AppendTrieDictionary.Builder.getInstance(dictInfo.getResourceDir()); + + sourceColumn = dictInfo.getSourceTable() + "_" + dictInfo.getSourceColumn(); + lock(sourceColumn); + + int maxEntriesPerSlice = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize(); + this.builder = new AppendTrieDictionaryBuilder(dictInfo.getResourceDir(), maxEntriesPerSlice); this.baseId = baseId; } - + @Override public boolean addValue(String value) { - if (value == null) + if (++counter % 1_000_000 == 0) { + if (lock.lockPath(getLockPath(sourceColumn), jobUUID)) { + logger.info("processed {} values for {}", counter, sourceColumn); + } else { + throw new RuntimeException("Failed to create global dictionary on " + sourceColumn + " This client doesn't keep the lock"); + } + } + + if (value == null) { return false; - - builder.addValue(value); + } + + try { + builder.addValue(value); + } catch (Throwable e) { + checkAndUnlock(); + throw new RuntimeException(String.format("Failed to create global dictionary on %s ", sourceColumn), e); + } + return true; } - + @Override public Dictionary<String> build() throws IOException { - return builder.build(baseId); + try { + if (lock.lockPath(getLockPath(sourceColumn), jobUUID)) { + return builder.build(baseId); + } + } finally { + checkAndUnlock(); + } + return new AppendTrieDictionary<>(); + } + + private void lock(final String sourceColumn) throws IOException { + lock = KylinConfig.getInstanceFromEnv().getDistributedLock(); + + if (!lock.lockPath(getLockPath(sourceColumn), jobUUID)) { + logger.info("{} will wait the lock for {} ", jobUUID, sourceColumn); + + final BlockingQueue<String> bq = new ArrayBlockingQueue<String>(1); + + Closeable watch = lock.watchPath(getWatchPath(sourceColumn), MoreExecutors.sameThreadExecutor(), new DistributedLock.Watcher() { + @Override + public void process(String path, String data) { + if (!data.equalsIgnoreCase(jobUUID) && lock.lockPath(getLockPath(sourceColumn), jobUUID)) { + try { + bq.put("getLock"); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + }); + + long start = System.currentTimeMillis(); + + try { + bq.take(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + watch.close(); + } + + logger.info("{} has waited the lock {} ms for {} ", jobUUID, (System.currentTimeMillis() - start), sourceColumn); + } + } + + private void checkAndUnlock() { + if (lock.lockPath(getLockPath(sourceColumn), jobUUID)) { + lock.unlockPath(getLockPath(sourceColumn)); + } + } + + private static final String GLOBAL_DICT_LOCK_PATH = "/kylin/dict/lock"; + + private String getLockPath(String pathName) { + return GLOBAL_DICT_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName + "/lock"; + } + + private String getWatchPath(String pathName) { + return GLOBAL_DICT_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName; } }
http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendDictNode.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendDictNode.java b/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendDictNode.java new file mode 100644 index 0000000..ee3a2c2 --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendDictNode.java @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.dict.global; + +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.dict.AppendTrieDictionary; +import org.apache.kylin.dict.TrieDictionary; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.IdentityHashMap; +import java.util.LinkedList; + +public class AppendDictNode { + public byte[] part; + public int id = -1; + public boolean isEndOfValue; + public ArrayList<AppendDictNode> children = new ArrayList<>(); + + public int nValuesBeneath; + public AppendDictNode parent; + public int childrenCount = 1; + + AppendDictNode(byte[] value, boolean isEndOfValue) { + reset(value, isEndOfValue); + } + + AppendDictNode(byte[] value, boolean isEndOfValue, ArrayList<AppendDictNode> children) { + reset(value, isEndOfValue, children); + } + + void reset(byte[] value, boolean isEndOfValue) { + reset(value, isEndOfValue, new ArrayList<AppendDictNode>()); + } + + void reset(byte[] value, boolean isEndOfValue, ArrayList<AppendDictNode> children) { + this.part = value; + this.isEndOfValue = isEndOfValue; + clearChild(); + for (AppendDictNode child : children) { + addChild(child); + } + this.id = -1; + } + + void clearChild() { + this.children.clear(); + int childrenCountDelta = this.childrenCount - 1; + for (AppendDictNode p = this; p != null; p = p.parent) { + p.childrenCount -= childrenCountDelta; + } + } + + void addChild(AppendDictNode child) { + addChild(-1, child); + } + + void addChild(int index, AppendDictNode child) { + child.parent = this; + if (index < 0) { + this.children.add(child); + } else { + this.children.add(index, child); + } + for (AppendDictNode p = this; p != null; p = p.parent) { + p.childrenCount += child.childrenCount; + } + } + + private AppendDictNode removeChild(int index) { + AppendDictNode child = children.remove(index); + child.parent = null; + for (AppendDictNode p = this; p != null; p = p.parent) { + p.childrenCount -= child.childrenCount; + } + return child; + } + + private AppendDictNode duplicateNode() { + AppendDictNode newChild = new AppendDictNode(part, false); + newChild.parent = parent; + if (parent != null) { + int index = parent.children.indexOf(this); + parent.addChild(index + 1, newChild); + } + return newChild; + } + + public byte[] firstValue() { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + AppendDictNode p = this; + while (true) { + bytes.write(p.part, 0, p.part.length); + if (p.isEndOfValue || p.children.size() == 0) { + break; + } + p = p.children.get(0); + } + return bytes.toByteArray(); + } + + public static AppendDictNode splitNodeTree(final AppendDictNode splitNode) { + if (splitNode == null) { + return null; + } + AppendDictNode current = splitNode; + AppendDictNode p = current.parent; + while (p != null) { + int index = p.children.indexOf(current); + assert index != -1; + AppendDictNode newParent = p.duplicateNode(); + for (int i = p.children.size() - 1; i >= index; i--) { + AppendDictNode child = p.removeChild(i); + newParent.addChild(0, child); + } + current = newParent; + p = p.parent; + } + return current; + } + + public byte[] buildTrieBytes() { + Stats stats = Stats.stats(this); + int sizeChildOffset = stats.mbpn_sizeChildOffset; + int sizeId = stats.mbpn_sizeId; + + // write head + byte[] head; + try { + ByteArrayOutputStream byteBuf = new ByteArrayOutputStream(); + DataOutputStream headOut = new DataOutputStream(byteBuf); + headOut.write(AppendTrieDictionary.HEAD_MAGIC); + headOut.writeShort(0); // head size, will back fill + headOut.writeInt(stats.mbpn_footprint); // body size + headOut.writeInt(stats.nValues); + headOut.write(sizeChildOffset); + headOut.write(sizeId); + headOut.close(); + head = byteBuf.toByteArray(); + BytesUtil.writeUnsigned(head.length, head, AppendTrieDictionary.HEAD_SIZE_I, 2); + } catch (IOException e) { + throw new RuntimeException(e); // shall not happen, as we are + } + + byte[] trieBytes = new byte[stats.mbpn_footprint + head.length]; + System.arraycopy(head, 0, trieBytes, 0, head.length); + + LinkedList<AppendDictNode> open = new LinkedList<AppendDictNode>(); + IdentityHashMap<AppendDictNode, Integer> offsetMap = new IdentityHashMap<AppendDictNode, Integer>(); + + // write body + int o = head.length; + offsetMap.put(this, o); + o = build_writeNode(this, o, true, sizeChildOffset, sizeId, trieBytes); + if (this.children.isEmpty() == false) + open.addLast(this); + + while (open.isEmpty() == false) { + AppendDictNode parent = open.removeFirst(); + build_overwriteChildOffset(offsetMap.get(parent), o - head.length, sizeChildOffset, trieBytes); + for (int i = 0; i < parent.children.size(); i++) { + AppendDictNode c = parent.children.get(i); + boolean isLastChild = (i == parent.children.size() - 1); + offsetMap.put(c, o); + o = build_writeNode(c, o, isLastChild, sizeChildOffset, sizeId, trieBytes); + if (c.children.isEmpty() == false) + open.addLast(c); + } + } + + if (o != trieBytes.length) + throw new RuntimeException(); + return trieBytes; + } + + private void build_overwriteChildOffset(int parentOffset, int childOffset, int sizeChildOffset, byte[] trieBytes) { + int flags = (int) trieBytes[parentOffset] & (TrieDictionary.BIT_IS_LAST_CHILD | TrieDictionary.BIT_IS_END_OF_VALUE); + BytesUtil.writeUnsigned(childOffset, trieBytes, parentOffset, sizeChildOffset); + trieBytes[parentOffset] |= flags; + } + + private int build_writeNode(AppendDictNode n, int offset, boolean isLastChild, int sizeChildOffset, int sizeId, byte[] trieBytes) { + int o = offset; + + // childOffset + if (isLastChild) + trieBytes[o] |= TrieDictionary.BIT_IS_LAST_CHILD; + if (n.isEndOfValue) + trieBytes[o] |= TrieDictionary.BIT_IS_END_OF_VALUE; + o += sizeChildOffset; + + // nValueBytes + if (n.part.length > 255) + throw new RuntimeException("Value length is " + n.part.length + " and larger than 255: " + Bytes.toStringBinary(n.part)); + BytesUtil.writeUnsigned(n.part.length, trieBytes, o, 1); + o++; + + // valueBytes + System.arraycopy(n.part, 0, trieBytes, o, n.part.length); + o += n.part.length; + + if (n.isEndOfValue) { + checkValidId(n.id); + BytesUtil.writeUnsigned(n.id, trieBytes, o, sizeId); + o += sizeId; + } + + return o; + } + + // The dict id starts from 1 to 2147483647 and 2147483648 to -2, leave 0 and -1 used for uninitialized state + private void checkValidId(int id) { + if (id == 0 || id == -1) { + throw new IllegalArgumentException("AppendTrieDictionary Id Overflow Unsigned Integer Size 4294967294"); + } + } + + @Override + public String toString() { + return String.format("DictNode[root=%s, nodes=%d, firstValue=%s]", Bytes.toStringBinary(part), childrenCount, Bytes.toStringBinary(firstValue())); + } + + static class Stats { + public interface Visitor { + void visit(AppendDictNode n, int level); + } + + private static void traverseR(AppendDictNode node, Visitor visitor, int level) { + visitor.visit(node, level); + for (AppendDictNode c : node.children) + traverseR(c, visitor, level + 1); + } + + private static void traversePostOrderR(AppendDictNode node, Visitor visitor, int level) { + for (AppendDictNode c : node.children) + traversePostOrderR(c, visitor, level + 1); + visitor.visit(node, level); + } + + public int nValues; // number of values in total + public int nValueBytesPlain; // number of bytes for all values + // uncompressed + public int nValueBytesCompressed; // number of values bytes in Trie + // (compressed) + public int maxValueLength; // size of longest value in bytes + + // the trie is multi-byte-per-node + public int mbpn_nNodes; // number of nodes in trie + public int mbpn_trieDepth; // depth of trie + public int mbpn_maxFanOut; // the maximum no. children + public int mbpn_nChildLookups; // number of child lookups during lookup + // every value once + public int mbpn_nTotalFanOut; // the sum of fan outs during lookup every + // value once + public int mbpn_sizeValueTotal; // the sum of value space in all nodes + public int mbpn_sizeNoValueBytes; // size of field noValueBytes + public int mbpn_sizeChildOffset; // size of field childOffset, points to + // first child in flattened array + public int mbpn_sizeId; // size of id value, always be 4 + public int mbpn_footprint; // MBPN footprint in bytes + + /** + * out print some statistics of the trie and the dictionary built from it + */ + public static Stats stats(AppendDictNode root) { + // calculate nEndValueBeneath + traversePostOrderR(root, new Visitor() { + @Override + public void visit(AppendDictNode n, int level) { + n.nValuesBeneath = n.isEndOfValue ? 1 : 0; + for (AppendDictNode c : n.children) + n.nValuesBeneath += c.nValuesBeneath; + } + }, 0); + + // run stats + final Stats s = new Stats(); + final ArrayList<Integer> lenAtLvl = new ArrayList<Integer>(); + traverseR(root, new Visitor() { + @Override + public void visit(AppendDictNode n, int level) { + if (n.isEndOfValue) + s.nValues++; + s.nValueBytesPlain += n.part.length * n.nValuesBeneath; + s.nValueBytesCompressed += n.part.length; + s.mbpn_nNodes++; + if (s.mbpn_trieDepth < level + 1) + s.mbpn_trieDepth = level + 1; + if (n.children.size() > 0) { + if (s.mbpn_maxFanOut < n.children.size()) + s.mbpn_maxFanOut = n.children.size(); + int childLookups = n.nValuesBeneath - (n.isEndOfValue ? 1 : 0); + s.mbpn_nChildLookups += childLookups; + s.mbpn_nTotalFanOut += childLookups * n.children.size(); + } + + if (level < lenAtLvl.size()) + lenAtLvl.set(level, n.part.length); + else + lenAtLvl.add(n.part.length); + int lenSoFar = 0; + for (int i = 0; i <= level; i++) + lenSoFar += lenAtLvl.get(i); + if (lenSoFar > s.maxValueLength) + s.maxValueLength = lenSoFar; + } + }, 0); + + // flatten trie footprint calculation, case of Multi-Byte-Per-DictNode + s.mbpn_sizeId = 4; + s.mbpn_sizeValueTotal = s.nValueBytesCompressed + s.nValues * s.mbpn_sizeId; + s.mbpn_sizeNoValueBytes = 1; + s.mbpn_sizeChildOffset = 5; + s.mbpn_footprint = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset); + while (true) { // minimize the offset size to match the footprint + int t = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset - 1); + // *4 because 2 MSB of offset is used for isEndOfValue & isEndChild flag + // expand t to long before *4, avoiding exceed Integer.MAX_VALUE + if (BytesUtil.sizeForValue((long) t * 4) <= s.mbpn_sizeChildOffset - 1) { + s.mbpn_sizeChildOffset--; + s.mbpn_footprint = t; + } else + break; + } + + return s; + } + + /** + * out print trie for debug + */ + public void print(AppendDictNode root) { + print(root, System.out); + } + + public void print(AppendDictNode root, final PrintStream out) { + traverseR(root, new Visitor() { + @Override + public void visit(AppendDictNode n, int level) { + try { + for (int i = 0; i < level; i++) + out.print(" "); + out.print(new String(n.part, "UTF-8")); + out.print(" - "); + if (n.nValuesBeneath > 0) + out.print(n.nValuesBeneath); + if (n.isEndOfValue) + out.print("* [" + n.id + "]"); + out.print("\n"); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + } + }, 0); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendDictSlice.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendDictSlice.java b/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendDictSlice.java new file mode 100644 index 0000000..4e820e0 --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendDictSlice.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.dict.global; + +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.BytesUtil; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; + +public class AppendDictSlice { + static final byte[] HEAD_MAGIC = new byte[] { 0x41, 0x70, 0x70, 0x65, 0x63, 0x64, 0x54, 0x72, 0x69, 0x65, 0x44, 0x69, 0x63, 0x74 }; // "AppendTrieDict" + static final int HEAD_SIZE_I = HEAD_MAGIC.length; + static final int BIT_IS_LAST_CHILD = 0x80; + static final int BIT_IS_END_OF_VALUE = 0x40; + + private byte[] trieBytes; + + // non-persistent part + transient private int headSize; + transient private int bodyLen; + transient private int sizeChildOffset; + + transient private int nValues; + transient private int sizeOfId; + // mask MUST be long, since childOffset maybe 5 bytes at most + transient private long childOffsetMask; + transient private int firstByteOffset; + + public AppendDictSlice(byte[] bytes) { + this.trieBytes = bytes; + init(); + } + + private void init() { + if (BytesUtil.compareBytes(HEAD_MAGIC, 0, trieBytes, 0, HEAD_MAGIC.length) != 0) + throw new IllegalArgumentException("Wrong file type (magic does not match)"); + + try { + DataInputStream headIn = new DataInputStream(new ByteArrayInputStream(trieBytes, HEAD_SIZE_I, trieBytes.length - HEAD_SIZE_I)); + this.headSize = headIn.readShort(); + this.bodyLen = headIn.readInt(); + this.nValues = headIn.readInt(); + this.sizeChildOffset = headIn.read(); + this.sizeOfId = headIn.read(); + + this.childOffsetMask = ~(((long) (BIT_IS_LAST_CHILD | BIT_IS_END_OF_VALUE)) << ((sizeChildOffset - 1) * 8)); + this.firstByteOffset = sizeChildOffset + 1; // the offset from begin of node to its first value byte + } catch (Exception e) { + if (e instanceof RuntimeException) + throw (RuntimeException) e; + else + throw new RuntimeException(e); + } + } + + public static AppendDictSlice deserializeFrom(DataInput in) throws IOException { + byte[] headPartial = new byte[HEAD_MAGIC.length + Short.SIZE / Byte.SIZE + Integer.SIZE / Byte.SIZE]; + in.readFully(headPartial); + + if (BytesUtil.compareBytes(HEAD_MAGIC, 0, headPartial, 0, HEAD_MAGIC.length) != 0) + throw new IllegalArgumentException("Wrong file type (magic does not match)"); + + DataInputStream headIn = new DataInputStream(// + new ByteArrayInputStream(headPartial, HEAD_SIZE_I, headPartial.length - HEAD_SIZE_I)); + int headSize = headIn.readShort(); + int bodyLen = headIn.readInt(); + headIn.close(); + + byte[] all = new byte[headSize + bodyLen]; + System.arraycopy(headPartial, 0, all, 0, headPartial.length); + in.readFully(all, headPartial.length, all.length - headPartial.length); + + return new AppendDictSlice(all); + } + + public byte[] getFirstValue() { + int nodeOffset = headSize; + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + while (true) { + int valueLen = BytesUtil.readUnsigned(trieBytes, nodeOffset + firstByteOffset - 1, 1); + bytes.write(trieBytes, nodeOffset + firstByteOffset, valueLen); + if (checkFlag(nodeOffset, BIT_IS_END_OF_VALUE)) { + break; + } + nodeOffset = headSize + (int) (BytesUtil.readLong(trieBytes, nodeOffset, sizeChildOffset) & childOffsetMask); + if (nodeOffset == headSize) { + break; + } + } + return bytes.toByteArray(); + } + + /** + * returns a code point from [0, nValues), preserving order of value + * + * @param n -- the offset of current node + * @param inp -- input value bytes to lookup + * @param o -- offset in the input value bytes matched so far + * @param inpEnd -- end of input + * @param roundingFlag -- =0: return -1 if not found + * -- <0: return closest smaller if not found, return -1 + * -- >0: return closest bigger if not found, return nValues + */ + private int lookupSeqNoFromValue(int n, byte[] inp, int o, int inpEnd, int roundingFlag) { + while (true) { + // match the current node + int p = n + firstByteOffset; // start of node's value + int end = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1); // end of node's value + for (; p < end && o < inpEnd; p++, o++) { // note matching start from [0] + if (trieBytes[p] != inp[o]) { + return -1; // mismatch + } + } + + // node completely matched, is input all consumed? + boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE); + if (o == inpEnd) { + return p == end && isEndOfValue ? BytesUtil.readUnsigned(trieBytes, end, sizeOfId) : -1; + } + + // find a child to continue + int c = headSize + (int) (BytesUtil.readLong(trieBytes, n, sizeChildOffset) & childOffsetMask); + if (c == headSize) // has no children + return -1; + byte inpByte = inp[o]; + int comp; + while (true) { + p = c + firstByteOffset; + comp = BytesUtil.compareByteUnsigned(trieBytes[p], inpByte); + if (comp == 0) { // continue in the matching child, reset n and loop again + n = c; + break; + } else if (comp < 0) { // try next child + if (checkFlag(c, BIT_IS_LAST_CHILD)) + return -1; + c = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1) + (checkFlag(c, BIT_IS_END_OF_VALUE) ? sizeOfId : 0); + } else { // children are ordered by their first value byte + return -1; + } + } + } + } + + private boolean checkFlag(int offset, int bit) { + return (trieBytes[offset] & bit) > 0; + } + + public int getIdFromValueBytesImpl(byte[] value, int offset, int len, int roundingFlag) { + int id = lookupSeqNoFromValue(headSize, value, offset, offset + len, roundingFlag); + return id; + } + + public AppendDictNode rebuildTrieTree() { + return rebuildTrieTreeR(headSize, null); + } + + private AppendDictNode rebuildTrieTreeR(int n, AppendDictNode parent) { + AppendDictNode root = null; + while (true) { + int p = n + firstByteOffset; + int childOffset = (int) (BytesUtil.readLong(trieBytes, n, sizeChildOffset) & childOffsetMask); + int parLen = BytesUtil.readUnsigned(trieBytes, p - 1, 1); + boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE); + + byte[] value = new byte[parLen]; + System.arraycopy(trieBytes, p, value, 0, parLen); + + AppendDictNode node = new AppendDictNode(value, isEndOfValue); + if (isEndOfValue) { + int id = BytesUtil.readUnsigned(trieBytes, p + parLen, sizeOfId); + node.id = id; + } + + if (parent == null) { + root = node; + } else { + parent.addChild(node); + } + + if (childOffset != 0) { + rebuildTrieTreeR(childOffset + headSize, node); + } + + if (checkFlag(n, BIT_IS_LAST_CHILD)) { + break; + } else { + n += firstByteOffset + parLen + (isEndOfValue ? sizeOfId : 0); + } + } + return root; + } + + public boolean doCheck() { + int offset = headSize; + HashSet<Integer> parentSet = new HashSet<>(); + boolean lastChild = false; + + while (offset < trieBytes.length) { + if (lastChild) { + boolean contained = parentSet.remove(offset - headSize); + // Can't find parent, the data is corrupted + if (!contained) { + return false; + } + lastChild = false; + } + int p = offset + firstByteOffset; + int childOffset = (int) (BytesUtil.readLong(trieBytes, offset, sizeChildOffset) & childOffsetMask); + int parLen = BytesUtil.readUnsigned(trieBytes, p - 1, 1); + boolean isEndOfValue = checkFlag(offset, BIT_IS_END_OF_VALUE); + + // Copy value overflow, the data is corrupted + if (trieBytes.length < p + parLen) { + return false; + } + + // Check id is fine + if (isEndOfValue) { + BytesUtil.readUnsigned(trieBytes, p + parLen, sizeOfId); + } + + // Record it if has children + if (childOffset != 0) { + parentSet.add(childOffset); + } + + // brothers done, move to next parent + if (checkFlag(offset, BIT_IS_LAST_CHILD)) { + lastChild = true; + } + + // move to next node + offset += firstByteOffset + parLen + (isEndOfValue ? sizeOfId : 0); + } + + // ParentMap is empty, meaning all nodes has parent, the data is correct + return parentSet.isEmpty(); + } + + @Override + public String toString() { + return String.format("DictSlice[firstValue=%s, values=%d, bytes=%d]", Bytes.toStringBinary(getFirstValue()), nValues, bodyLen); + } + + @Override + public int hashCode() { + return Arrays.hashCode(trieBytes); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof AppendDictSlice)) { + return false; + } + AppendDictSlice that = (AppendDictSlice) o; + return Arrays.equals(this.trieBytes, that.trieBytes); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendDictSliceKey.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendDictSliceKey.java b/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendDictSliceKey.java new file mode 100644 index 0000000..323fe6b --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendDictSliceKey.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.dict.global; + +import org.apache.kylin.common.util.Bytes; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; + +public class AppendDictSliceKey implements Comparable<AppendDictSliceKey> { + static final AppendDictSliceKey START_KEY = AppendDictSliceKey.wrap(new byte[0]); + + byte[] key; + + public static AppendDictSliceKey wrap(byte[] key) { + AppendDictSliceKey dictKey = new AppendDictSliceKey(); + dictKey.key = key; + return dictKey; + } + + @Override + public String toString() { + return Bytes.toStringBinary(key); + } + + @Override + public int hashCode() { + return Arrays.hashCode(key); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o instanceof AppendDictSliceKey) { + AppendDictSliceKey that = (AppendDictSliceKey) o; + return Arrays.equals(this.key, that.key); + } + return false; + } + + @Override + public int compareTo(AppendDictSliceKey that) { + return Bytes.compareTo(key, that.key); + } + + public void write(DataOutput out) throws IOException { + out.writeInt(key.length); + out.write(key); + } + + public void readFields(DataInput in) throws IOException { + key = new byte[in.readInt()]; + in.readFully(key); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendTrieDictionaryBuilder.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendTrieDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendTrieDictionaryBuilder.java new file mode 100644 index 0000000..90d65b6 --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendTrieDictionaryBuilder.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.dict.global; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.dict.AppendTrieDictionary; +import org.apache.kylin.dict.BytesConverter; +import org.apache.kylin.dict.StringBytesConverter; + +import java.io.IOException; +import java.util.List; +import java.util.TreeMap; + +import static com.google.common.base.Preconditions.checkState; + +public class AppendTrieDictionaryBuilder { + + private final String baseDir; + private final String workingDir; + private final int maxEntriesPerSlice; + + private GlobalDictStore store; + private int maxId; + private int maxValueLength; + private int nValues; + private BytesConverter bytesConverter; + private TreeMap<AppendDictSliceKey, String> sliceFileMap = new TreeMap<>(); // slice key -> slice file name + + private AppendDictSliceKey curKey; + private AppendDictNode curNode; + + public AppendTrieDictionaryBuilder(String resourceDir, int maxEntriesPerSlice) throws IOException { + this.baseDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict" + resourceDir + "/"; + this.workingDir = this.baseDir + "/working"; + this.maxEntriesPerSlice = maxEntriesPerSlice; + init(); + } + + public synchronized void init() throws IOException { + this.store = new GlobalDictHDFSStore(baseDir); + store.prepareForWrite(workingDir); + + Long[] versions = store.listAllVersions(); + + if (versions.length == 0) { // build dict for the first time + this.maxId = 0; + this.maxValueLength = 0; + this.nValues = 0; + this.bytesConverter = new StringBytesConverter(); + + } else { // append values to last version + GlobalDictMetadata metadata = store.getMetadata(versions[versions.length - 1]); + this.maxId = metadata.maxId; + this.maxValueLength = metadata.maxValueLength; + this.nValues = metadata.nValues; + this.bytesConverter = metadata.bytesConverter; + this.sliceFileMap = new TreeMap<>(metadata.sliceFileMap); + } + } + + @SuppressWarnings("unchecked") + public void addValue(String value) throws IOException { + byte[] valueBytes = bytesConverter.convertToBytes(value); + + if (sliceFileMap.isEmpty()) { + curNode = new AppendDictNode(new byte[0], false); + sliceFileMap.put(AppendDictSliceKey.START_KEY, null); + } + checkState(sliceFileMap.firstKey().equals(AppendDictSliceKey.START_KEY), "first key should be \"\", but got \"%s\"", sliceFileMap.firstKey()); + + AppendDictSliceKey nextKey = sliceFileMap.floorKey(AppendDictSliceKey.wrap(valueBytes)); + + if (curKey != null && !nextKey.equals(curKey)) { + // you may suppose nextKey>=curKey, but nextKey<curKey could happen when a node splits. + // for example, suppose we have curNode [1-10], and add value "2" triggers split: + // first half [1-5] is flushed out, make second half [6-10] curNode and "6" curKey. + // then value "3" is added, now we got nextKey "1" smaller than curKey "6", surprise! + // in this case, we need to flush [6-10] and read [1-5] back. + flushCurrentNode(); + curNode = null; + } + if (curNode == null) { // read next slice + AppendDictSlice slice = store.readSlice(workingDir, sliceFileMap.get(nextKey)); + curNode = slice.rebuildTrieTree(); + } + curKey = nextKey; + + addValueR(curNode, valueBytes, 0); + + // split slice if it's too large + if (curNode.childrenCount > maxEntriesPerSlice) { + AppendDictNode newRoot = splitNodeTree(curNode); + flushCurrentNode(); + curNode = newRoot; + curKey = AppendDictSliceKey.wrap(newRoot.firstValue()); + sliceFileMap.put(curKey, null); + } + + maxValueLength = Math.max(maxValueLength, valueBytes.length); + } + + public AppendTrieDictionary build(int baseId) throws IOException { + if (curNode != null) { + flushCurrentNode(); + } + + GlobalDictMetadata metadata = new GlobalDictMetadata(baseId, this.maxId, this.maxValueLength, this.nValues, this.bytesConverter, sliceFileMap); + store.commit(workingDir, metadata); + + AppendTrieDictionary dict = new AppendTrieDictionary(); + dict.init(this.baseDir); + return dict; + } + + private void flushCurrentNode() throws IOException { + String newSliceFile = store.writeSlice(workingDir, curKey, curNode); + String oldSliceFile = sliceFileMap.put(curKey, newSliceFile); + if (oldSliceFile != null) { + store.deleteSlice(workingDir, oldSliceFile); + } + } + + private void addValueR(AppendDictNode node, byte[] value, int start) { + // match the value part of current node + int i = 0, j = start; + int n = node.part.length, nn = value.length; + int comp = 0; + for (; i < n && j < nn; i++, j++) { + comp = BytesUtil.compareByteUnsigned(node.part[i], value[j]); + if (comp != 0) + break; + } + + if (j == nn) { + // if value fully matched within the current node + if (i == n) { + // on first match, mark end of value and assign an ID + if (!node.isEndOfValue) { + node.id = createNextId(); + node.isEndOfValue = true; + } + } else { + // otherwise, split the current node into two + AppendDictNode c = new AppendDictNode(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children); + c.id = node.id; + node.reset(BytesUtil.subarray(node.part, 0, i), true); + node.addChild(c); + node.id = createNextId(); + } + return; + } + + // if partially matched the current, split the current node, add the new + // value, make a 3-way + if (i < n) { + AppendDictNode c1 = new AppendDictNode(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children); + c1.id = node.id; + AppendDictNode c2 = addNodeMaybeOverflow(value, j, nn); + node.reset(BytesUtil.subarray(node.part, 0, i), false); + if (comp < 0) { + node.addChild(c1); + node.addChild(c2); + } else { + node.addChild(c2); + node.addChild(c1); + } + return; + } + + // out matched the current, binary search the next byte for a child node + // to continue + byte lookfor = value[j]; + int lo = 0; + int hi = node.children.size() - 1; + int mid = 0; + boolean found = false; + comp = 0; + while (!found && lo <= hi) { + mid = lo + (hi - lo) / 2; + AppendDictNode c = node.children.get(mid); + comp = BytesUtil.compareByteUnsigned(lookfor, c.part[0]); + if (comp < 0) + hi = mid - 1; + else if (comp > 0) + lo = mid + 1; + else + found = true; + } + if (found) { + // found a child node matching the first byte, continue in that child + addValueR(node.children.get(mid), value, j); + } else { + // otherwise, make the value a new child + AppendDictNode c = addNodeMaybeOverflow(value, j, nn); + node.addChild(comp <= 0 ? mid : mid + 1, c); + } + } + + private int createNextId() { + int id = ++maxId; + checkValidId(id); + nValues++; + return id; + } + + // The dict id starts from 1 to 2147483647 and 2147483648 to -2, leave 0 and -1 used for uninitialized state + private void checkValidId(int id) { + if (id == 0 || id == -1) { + throw new IllegalArgumentException("AppendTrieDictionary Id Overflow Unsigned Integer Size 4294967294"); + } + } + + // When add a new node, the value part maybe over 255 bytes, need split it into a sub tree + private AppendDictNode addNodeMaybeOverflow(byte[] value, int start, int end) { + AppendDictNode head = null; + AppendDictNode current = null; + for (; start + 255 < end; start += 255) { + AppendDictNode c = new AppendDictNode(BytesUtil.subarray(value, start, start + 255), false); + if (head == null) { + head = c; + current = c; + } else { + current.addChild(c); + current = c; + } + } + AppendDictNode last = new AppendDictNode(BytesUtil.subarray(value, start, end), true); + last.id = createNextId(); + if (head == null) { + head = last; + } else { + current.addChild(last); + } + return head; + } + + private AppendDictNode splitNodeTree(AppendDictNode root) { + AppendDictNode parent = root; + int childCountToSplit = (int) (maxEntriesPerSlice * 1.0 / 2); + while (true) { + List<AppendDictNode> children = parent.children; + if (children.size() == 0) { + break; + } + if (children.size() == 1) { + parent = children.get(0); + } else { + for (int i = children.size() - 1; i >= 0; i--) { + parent = children.get(i); + if (childCountToSplit > children.get(i).childrenCount) { + childCountToSplit -= children.get(i).childrenCount; + } else { + childCountToSplit--; + break; + } + } + } + } + return AppendDictNode.splitNodeTree(parent); + } + + // Only used for test + void setMaxId(int id) { + this.maxId = id; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendTrieDictionaryChecker.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendTrieDictionaryChecker.java b/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendTrieDictionaryChecker.java new file mode 100644 index 0000000..94b6e9d --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendTrieDictionaryChecker.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.dict.global; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; + +import static org.apache.kylin.dict.global.GlobalDictHDFSStore.BUFFER_SIZE; + +/** + * Created by sunyerui on 16/11/15. + */ +public class AppendTrieDictionaryChecker { + + public boolean runChecker(String baseDir) throws IOException { + Path basePath = new Path(baseDir); + FileSystem fs = HadoopUtil.getFileSystem(basePath); + List<Path> sliceList = new ArrayList<>(); + List<Path> corruptedSliceList = new ArrayList<>(); + listDictSlicePath(fs, fs.getFileStatus(basePath), sliceList); + + for (Path path : sliceList) { + if (!doCheck(fs, path)) { + System.out.println("AppendDict Slice " + path + " corrupted"); + corruptedSliceList.add(path); + } else { + System.out.println("AppendDict Slice " + path + " is right"); + } + } + + if (corruptedSliceList.isEmpty()) { + System.out.println("ALL AppendDict Slices is right"); + return true; + } else { + System.out.println("Some AppendDict Slice(s) corrupted: "); + for (Path path : corruptedSliceList) { + System.out.println(path.toString()); + } + return false; + } + } + + public void listDictSlicePath(FileSystem fs, FileStatus path, List<Path> list) throws IOException { + if (path.isDirectory()) { + for (FileStatus status : fs.listStatus(path.getPath())) { + listDictSlicePath(fs, status, list); + } + } else { + if (path.getPath().getName().startsWith(GlobalDictHDFSStore.IndexFormatV1.SLICE_PREFIX)) { + list.add(path.getPath()); + } + } + } + + public boolean doCheck(FileSystem fs, Path filePath) { + try (FSDataInputStream input = fs.open(filePath, BUFFER_SIZE)) { + AppendDictSlice slice = AppendDictSlice.deserializeFrom(input); + return slice.doCheck(); + } catch (Exception e) { + return false; + } catch (Error e) { + return false; + } + } + + public static void main(String[] args) throws IOException { + String path = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict/"; + if (args.length > 0) { + path = args[0]; + } + System.out.println("Recursive Check AppendTrieDictionary Slices in path " + path); + AppendTrieDictionaryChecker checker = new AppendTrieDictionaryChecker(); + if (checker.runChecker(path)) { + System.exit(0); + } else { + System.exit(-1); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictHDFSStore.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictHDFSStore.java b/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictHDFSStore.java new file mode 100644 index 0000000..b30d5b9 --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictHDFSStore.java @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.dict.global; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.dict.BytesConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.TreeMap; +import java.util.TreeSet; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +public class GlobalDictHDFSStore extends GlobalDictStore { + + static final Logger logger = LoggerFactory.getLogger(GlobalDictHDFSStore.class); + static final String V1_INDEX_NAME = ".index"; + static final String V2_INDEX_NAME = ".index_v2"; + static final String VERSION_PREFIX = "version_"; + static final int BUFFER_SIZE = 8 * 1024 * 1024; + + private final Path basePath; + private final Configuration conf; + private final FileSystem fileSystem; + + public GlobalDictHDFSStore(String baseDir) throws IOException { + super(baseDir); + this.basePath = new Path(baseDir); + this.conf = HadoopUtil.getCurrentConfiguration(); + this.fileSystem = HadoopUtil.getFileSystem(baseDir); + + if (!fileSystem.exists(basePath)) { + logger.info("Global dict at {} doesn't exist, create a new one", basePath); + fileSystem.mkdirs(basePath); + } + + migrateOldLayout(); + } + + // Previously we put slice files and index file directly in base directory, + // should migrate to the new versioned layout + private void migrateOldLayout() throws IOException { + FileStatus[] sliceFiles = fileSystem.listStatus(basePath, new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith(IndexFormatV1.SLICE_PREFIX); + } + }); + Path indexFile = new Path(basePath, V1_INDEX_NAME); + + if (fileSystem.exists(indexFile) && sliceFiles.length > 0) { // old layout + final long version = System.currentTimeMillis(); + Path tempDir = new Path(basePath, "tmp_" + VERSION_PREFIX + version); + Path versionDir = getVersionDir(version); + + logger.info("Convert global dict at {} to new layout with version {}", basePath, version); + + fileSystem.mkdirs(tempDir); + // convert to new layout + try { + // copy index and slice files to temp + FileUtil.copy(fileSystem, indexFile, fileSystem, tempDir, false, conf); + for (FileStatus sliceFile : sliceFiles) { + FileUtil.copy(fileSystem, sliceFile.getPath(), fileSystem, tempDir, false, conf); + } + // rename + fileSystem.rename(tempDir, versionDir); + // delete index and slices files in base dir + fileSystem.delete(indexFile, false); + for (FileStatus sliceFile : sliceFiles) { + fileSystem.delete(sliceFile.getPath(), true); + } + + } finally { + if (fileSystem.exists(tempDir)) { + fileSystem.delete(tempDir, true); + } + } + } + } + + @Override + void prepareForWrite(String workingDir) throws IOException { + // TODO create lock file + Path working = new Path(workingDir); + + if (fileSystem.exists(working)) { + fileSystem.delete(working, true); + logger.info("Working directory {} exits, delete it first", working); + } + + // when build dict, copy all data into working dir and work on it, avoiding suddenly server crash made data corrupt + Long[] versions = listAllVersions(); + if (versions.length > 0) { + Path latestVersion = getVersionDir(versions[versions.length - 1]); + FileUtil.copy(fileSystem, latestVersion, fileSystem, working, false, true, conf); + } else { + fileSystem.mkdirs(working); + } + } + + @Override + public Long[] listAllVersions() throws IOException { + FileStatus[] versionDirs = fileSystem.listStatus(basePath, new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith(VERSION_PREFIX); + } + }); + TreeSet<Long> versions = new TreeSet<>(); + for (int i = 0; i < versionDirs.length; i++) { + Path path = versionDirs[i].getPath(); + versions.add(Long.parseLong(path.getName().substring(VERSION_PREFIX.length()))); + } + return versions.toArray(new Long[versions.size()]); + } + + @Override + public Path getVersionDir(long version) { + return new Path(basePath, VERSION_PREFIX + version); + } + + @Override + public GlobalDictMetadata getMetadata(long version) throws IOException { + Path versionDir = getVersionDir(version); + FileStatus[] indexFiles = fileSystem.listStatus(versionDir, new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith(V1_INDEX_NAME); + } + }); + checkState(indexFiles.length == 1, "zero or more than one index file found: %s", Arrays.toString(indexFiles)); + + IndexFormat format; + String indexFile = indexFiles[0].getPath().getName(); + if (V2_INDEX_NAME.equals(indexFile)) { + format = new IndexFormatV2(fileSystem, conf); + } else if (V1_INDEX_NAME.equals(indexFile)) { + format = new IndexFormatV1(fileSystem, conf); + } else { + throw new RuntimeException("Unknown index file: " + indexFile); + } + + return format.readIndexFile(versionDir); + } + + @Override + public AppendDictSlice readSlice(String directory, String sliceFileName) throws IOException { + Path path = new Path(directory, sliceFileName); + logger.info("read slice from {}", path); + try (FSDataInputStream input = fileSystem.open(path, BUFFER_SIZE)) { + return AppendDictSlice.deserializeFrom(input); + } + } + + @Override + public String writeSlice(String workingDir, AppendDictSliceKey key, AppendDictNode slice) throws IOException { + //write new slice + String sliceFile = IndexFormatV2.sliceFileName(key); + Path path = new Path(workingDir, sliceFile); + + logger.info("write slice with key {} into file {}", key, path); + try (FSDataOutputStream out = fileSystem.create(path, true, BUFFER_SIZE)) { + byte[] bytes = slice.buildTrieBytes(); + out.write(bytes); + } + return sliceFile; + } + + @Override + public void deleteSlice(String workingDir, String sliceFileName) throws IOException { + Path path = new Path(workingDir, sliceFileName); + logger.info("delete slice at {}", path); + if (fileSystem.exists(path)) { + fileSystem.delete(path, false); + } + } + + @Override + public void commit(String workingDir, GlobalDictMetadata metadata) throws IOException { + Path workingPath = new Path(workingDir); + + // delete v1 index file + Path oldIndexFile = new Path(workingPath, V1_INDEX_NAME); + if (fileSystem.exists(oldIndexFile)) { + fileSystem.delete(oldIndexFile, false); + } + // write v2 index file + IndexFormat index = new IndexFormatV2(fileSystem, conf); + index.writeIndexFile(workingPath, metadata); + index.sanityCheck(workingPath, metadata); + + // copy working dir to newVersion dir + Path newVersionPath = new Path(basePath, VERSION_PREFIX + System.currentTimeMillis()); + fileSystem.rename(workingPath, newVersionPath); + + cleanUp(); + } + + // Check versions count, delete expired versions + private void cleanUp() throws IOException { + Long[] versions = listAllVersions(); + long timestamp = System.currentTimeMillis(); + for (int i = 0; i < versions.length - maxVersions; i++) { + if (versions[i] + versionTTL < timestamp) { + fileSystem.delete(getVersionDir(versions[i]), true); + } + } + } + + @Override + public String copyToAnotherMeta(KylinConfig srcConfig, KylinConfig dstConfig) throws IOException { + checkArgument(baseDir.startsWith(srcConfig.getHdfsWorkingDirectory()), "Please check why current directory {} doesn't belong to source working directory {}", baseDir, srcConfig.getHdfsWorkingDirectory()); + + final String dstBaseDir = baseDir.replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory()); + + Long[] versions = listAllVersions(); + if (versions.length == 0) { // empty dict, nothing to copy + return dstBaseDir; + } + + Path srcVersionDir = getVersionDir(versions[versions.length - 1]); + Path dstVersionDir = new Path(srcVersionDir.toString().replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory())); + FileSystem dstFS = dstVersionDir.getFileSystem(conf); + if (dstFS.exists(dstVersionDir)) { + dstFS.delete(dstVersionDir, true); + } + FileUtil.copy(fileSystem, srcVersionDir, dstFS, dstVersionDir, false, true, conf); + + return dstBaseDir; + } + + public interface IndexFormat { + GlobalDictMetadata readIndexFile(Path dir) throws IOException; + + void writeIndexFile(Path dir, GlobalDictMetadata metadata) throws IOException; + + void sanityCheck(Path dir, GlobalDictMetadata metadata) throws IOException; + } + + public static class IndexFormatV1 implements IndexFormat { + static final String SLICE_PREFIX = "cached_"; + + protected final FileSystem fs; + protected final Configuration conf; + + protected IndexFormatV1(FileSystem fs, Configuration conf) { + this.fs = fs; + this.conf = conf; + } + + @Override + public GlobalDictMetadata readIndexFile(Path dir) throws IOException { + Path indexFile = new Path(dir, V1_INDEX_NAME); + try (FSDataInputStream in = fs.open(indexFile)) { + int baseId = in.readInt(); + int maxId = in.readInt(); + int maxValueLength = in.readInt(); + int nValues = in.readInt(); + String converterName = in.readUTF(); + BytesConverter converter; + try { + converter = ClassUtil.forName(converterName, BytesConverter.class).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Fail to instantiate BytesConverter: " + converterName, e); + } + + int nSlices = in.readInt(); + TreeMap<AppendDictSliceKey, String> sliceFileMap = new TreeMap<>(); + for (int i = 0; i < nSlices; i++) { + AppendDictSliceKey key = new AppendDictSliceKey(); + key.readFields(in); + sliceFileMap.put(key, sliceFileName(key)); + } + // make sure first key is always "" + String firstFile = sliceFileMap.remove(sliceFileMap.firstKey()); + sliceFileMap.put(AppendDictSliceKey.START_KEY, firstFile); + + return new GlobalDictMetadata(baseId, maxId, maxValueLength, nValues, converter, sliceFileMap); + } + } + + //only for test + @Override + public void writeIndexFile(Path dir, GlobalDictMetadata metadata) throws IOException { + Path indexFile = new Path(dir, V1_INDEX_NAME); + try (FSDataOutputStream out = fs.create(indexFile, true)) { + out.writeInt(metadata.baseId); + out.writeInt(metadata.maxId); + out.writeInt(metadata.maxValueLength); + out.writeInt(metadata.nValues); + out.writeUTF(metadata.bytesConverter.getClass().getName()); + out.writeInt(metadata.sliceFileMap.size()); + for (Map.Entry<AppendDictSliceKey, String> entry : metadata.sliceFileMap.entrySet()) { + entry.getKey().write(out); + } + } + } + + @Override + public void sanityCheck(Path dir, GlobalDictMetadata metadata) throws IOException { + throw new UnsupportedOperationException("sanityCheck V1 format is no longer supported"); + } + + public static String sliceFileName(AppendDictSliceKey key) { + return SLICE_PREFIX + key; + } + } + + public static class IndexFormatV2 extends IndexFormatV1 { + static final String SLICE_PREFIX = "cached_"; + static final int MINOR_VERSION_V1 = 0x01; + + protected IndexFormatV2(FileSystem fs, Configuration conf) { + super(fs, conf); + } + + @Override + public GlobalDictMetadata readIndexFile(Path dir) throws IOException { + Path indexFile = new Path(dir, V2_INDEX_NAME); + try (FSDataInputStream in = fs.open(indexFile)) { + byte minorVersion = in.readByte(); // include a header to allow minor format changes + if (minorVersion != MINOR_VERSION_V1) { + throw new RuntimeException("Unsupported minor version " + minorVersion); + } + int baseId = in.readInt(); + int maxId = in.readInt(); + int maxValueLength = in.readInt(); + int nValues = in.readInt(); + String converterName = in.readUTF(); + BytesConverter converter; + try { + converter = ClassUtil.forName(converterName, BytesConverter.class).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Fail to instantiate BytesConverter: " + converterName, e); + } + + int nSlices = in.readInt(); + TreeMap<AppendDictSliceKey, String> sliceFileMap = new TreeMap<>(); + for (int i = 0; i < nSlices; i++) { + AppendDictSliceKey key = new AppendDictSliceKey(); + key.readFields(in); + String sliceFileName = in.readUTF(); + sliceFileMap.put(key, sliceFileName); + } + + return new GlobalDictMetadata(baseId, maxId, maxValueLength, nValues, converter, sliceFileMap); + } + } + + @Override + public void writeIndexFile(Path dir, GlobalDictMetadata metadata) throws IOException { + Path indexFile = new Path(dir, V2_INDEX_NAME); + try (FSDataOutputStream out = fs.create(indexFile, true)) { + out.writeByte(MINOR_VERSION_V1); + out.writeInt(metadata.baseId); + out.writeInt(metadata.maxId); + out.writeInt(metadata.maxValueLength); + out.writeInt(metadata.nValues); + out.writeUTF(metadata.bytesConverter.getClass().getName()); + out.writeInt(metadata.sliceFileMap.size()); + for (Map.Entry<AppendDictSliceKey, String> entry : metadata.sliceFileMap.entrySet()) { + entry.getKey().write(out); + out.writeUTF(entry.getValue()); + } + } + } + + @Override + public void sanityCheck(Path dir, GlobalDictMetadata metadata) throws IOException { + for (Map.Entry<AppendDictSliceKey, String> entry : metadata.sliceFileMap.entrySet()) { + if (!fs.exists(new Path(dir, entry.getValue()))) { + throw new RuntimeException("The slice file " + entry.getValue() + " for the key: " + entry.getKey() + " must be existed!"); + } + } + } + + public static String sliceFileName(AppendDictSliceKey key) { + return String.format("%s%d_%d", SLICE_PREFIX, System.currentTimeMillis(), key.hashCode()); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictMetadata.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictMetadata.java b/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictMetadata.java new file mode 100644 index 0000000..7c89ea2 --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictMetadata.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.dict.global; + +import com.google.common.base.Preconditions; +import org.apache.kylin.dict.BytesConverter; + +import java.util.NavigableMap; +import java.util.TreeMap; + +/** + * Encapsulates the metadata for a particular version of the global dictionary. + * Usually each version of a global dictionary stores its metadata in an index file. + */ +public class GlobalDictMetadata { + public final int baseId; + public final int maxId; + public final int maxValueLength; + public final int nValues; + public final BytesConverter bytesConverter; + public final TreeMap<AppendDictSliceKey, String> sliceFileMap; // slice key -> slice file name + + public GlobalDictMetadata(int baseId, int maxId, int maxValueLength, int nValues, BytesConverter bytesConverter, NavigableMap<AppendDictSliceKey, String> sliceFileMap) { + + Preconditions.checkNotNull(bytesConverter, "bytesConverter"); + Preconditions.checkNotNull(sliceFileMap, "sliceFileMap"); + + this.baseId = baseId; + this.maxId = maxId; + this.maxValueLength = maxValueLength; + this.nValues = nValues; + this.bytesConverter = bytesConverter; + this.sliceFileMap = new TreeMap<>(sliceFileMap); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictStore.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictStore.java b/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictStore.java new file mode 100644 index 0000000..eaf0729 --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictStore.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.dict.global; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; + +import java.io.IOException; + +public abstract class GlobalDictStore { + + protected final String baseDir; // base directory containing all versions of this global dict + protected final int maxVersions; + protected final int versionTTL; + + protected GlobalDictStore(String baseDir) { + this.baseDir = Preconditions.checkNotNull(baseDir, "baseDir"); + this.maxVersions = KylinConfig.getInstanceFromEnv().getAppendDictMaxVersions(); + this.versionTTL = KylinConfig.getInstanceFromEnv().getAppendDictVersionTTL(); + } + + // workingDir should be an absolute path, will create if not exists + abstract void prepareForWrite(String workingDir) throws IOException; + + /** + * @return all versions of this dictionary in ascending order + * @throws IOException on I/O error + */ + public abstract Long[] listAllVersions() throws IOException; + + // return the path of specified version dir + public abstract Path getVersionDir(long version); + + /** + * Get the metadata for a particular version of the dictionary. + * @param version version number + * @return <i>GlobalDictMetadata</i> for the specified version + * @throws IOException on I/O error + */ + public abstract GlobalDictMetadata getMetadata(long version) throws IOException; + + /** + * Read a <i>DictSlice</i> from a slice file. + * @param workingDir directory of the slice file + * @param sliceFileName file name of the slice + * @return a <i>DictSlice</i> + * @throws IOException on I/O error + */ + public abstract AppendDictSlice readSlice(String workingDir, String sliceFileName) throws IOException; + + /** + * Write a slice with the given key to the specified directory. + * @param workingDir where to write the slice, should exist + * @param key slice key + * @param slice slice to write + * @return file name of the new written slice + * @throws IOException on I/O error + */ + public abstract String writeSlice(String workingDir, AppendDictSliceKey key, AppendDictNode slice) throws IOException; + + /** + * Delete a slice with the specified file name. + * @param workingDir directory of the slice file, should exist + * @param sliceFileName file name of the slice, should exist + * @throws IOException on I/O error + */ + public abstract void deleteSlice(String workingDir, String sliceFileName) throws IOException; + + /** + * commit the <i>DictSlice</i> and <i>GlobalDictMetadata</i> in workingDir to new versionDir + * @param workingDir where store the tmp slice and index, should exist + * @param globalDictMetadata the metadata of global dict + * @throws IOException on I/O error + */ + public abstract void commit(String workingDir, GlobalDictMetadata globalDictMetadata) throws IOException; + + /** + * Copy the latest version of this dict to another meta. The source is unchanged. + * @param srcConfig config of source meta + * @param dstConfig config of destination meta + * @return the new base directory for destination meta + * @throws IOException on I/O error + */ + public abstract String copyToAnotherMeta(KylinConfig srcConfig, KylinConfig dstConfig) throws IOException; +}