http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
 
b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
deleted file mode 100644
index e2af338..0000000
--- 
a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Random;
-import java.util.TreeMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-/**
- * Created by sunyerui on 16/4/28.
- */
-public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
-
-    public static final String BASE_DIR = "file:///tmp/kylin_append_dict";
-    public static final String RESOURCE_DIR = "/dict/append_dict_test";
-
-    @Before
-    public void setUp() {
-        staticCreateTestMetadata();
-        System.setProperty("kylin.dictionary.append-entry-size", "50000");
-        System.setProperty("kylin.env.hdfs-working-dir", BASE_DIR);
-    }
-
-    @After
-    public void after() {
-        cleanup();
-        staticCleanupTestMetadata();
-    }
-
-    public static void cleanup() {
-        Path basePath = new Path(BASE_DIR);
-        try {
-            HadoopUtil.getFileSystem(basePath).delete(basePath, true);
-        } catch (IOException e) {}
-    }
-
-    public static final String[] words = new String[] { "paint", "par", 
"part", "parts", "partition", "partitions", "party", "partie", "parties", 
"patient", "taste", "tar", "trie", "try", "tries", "字典", "字典树", 
"字母", // non-ascii characters
-            "", // empty
-            
"paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii",
-            
"paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiipaiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii",
-            
"paintjkjdfklajkdljfkdsajklfjklsadjkjekjrklewjrklewjklrjklewjkljkljkljkljweklrjewkljrklewjrlkjewkljrkljkljkjlkjjkljkljkljkljlkjlkjlkjljdfadfads"
 + 
"dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk"
 + 
"dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk"
 + 
"dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk"
 + 
"dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk"
 + 
"dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk"
-              + 
"dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk"
 + 
"dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk",
-            "paint", "tar", "try", // some dup
-    };
-
-    @Test
-    public void testStringRepeatly() throws IOException {
-        ArrayList<String> list = new ArrayList<>();
-        Collections.addAll(list, words);
-        ArrayList<String> notfound = new ArrayList<>();
-        notfound.add("pa");
-        notfound.add("pars");
-        notfound.add("tri");
-        notfound.add("字");
-        for (int i = 0; i < 100; i++) {
-            testStringDictAppend(list, notfound, true);
-        }
-    }
-
-    @Test
-    public void englishWordsTest() throws Exception {
-        InputStream is = new 
FileInputStream("src/test/resources/dict/english-words.80 
(scowl-2015.05.18).txt");
-        ArrayList<String> str = loadStrings(is);
-        testStringDictAppend(str, null, false);
-    }
-
-    @Test
-    public void categoryNamesTest() throws Exception {
-        InputStream is = new 
FileInputStream("src/test/resources/dict/dw_category_grouping_names.dat");
-        ArrayList<String> str = loadStrings(is);
-        testStringDictAppend(str, null, true);
-    }
-
-    private static ArrayList<String> loadStrings(InputStream is) throws 
Exception {
-        ArrayList<String> r = new ArrayList<String>();
-        BufferedReader reader = new BufferedReader(new InputStreamReader(is, 
"UTF-8"));
-        try {
-            String word;
-            while ((word = reader.readLine()) != null) {
-                word = word.trim();
-                if (word.isEmpty() == false)
-                    r.add(word);
-            }
-        } finally {
-            reader.close();
-            is.close();
-        }
-        return r;
-    }
-
-    @Ignore("need huge key set")
-    @Test
-    public void testHugeKeySet() throws IOException {
-        AppendTrieDictionary.Builder<String> b = 
AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR);
-        AppendTrieDictionary<String> dict = null;
-
-        InputStream is = new 
FileInputStream("src/test/resources/dict/huge_key");
-        BufferedReader reader = new BufferedReader(new InputStreamReader(is, 
"UTF-8"));
-        try {
-            String word;
-            while ((word = reader.readLine()) != null) {
-                word = word.trim();
-                if (!word.isEmpty())
-                    b.addValue(word);
-            }
-        } finally {
-            reader.close();
-            is.close();
-        }
-        dict = b.build(0);
-        dict.dump(System.out);
-    }
-
-    private static void testStringDictAppend(ArrayList<String> list, 
ArrayList<String> notfound, boolean shuffleList) throws IOException {
-        Random rnd = new Random(System.currentTimeMillis());
-        ArrayList<String> strList = new ArrayList<String>();
-        strList.addAll(list);
-        if (shuffleList) {
-            Collections.shuffle(strList, rnd);
-        }
-        BytesConverter converter = new StringBytesConverter();
-
-        AppendTrieDictionary.Builder<String> b = 
AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR);
-        AppendTrieDictionary<String> dict = null;
-        TreeMap<Integer, String> checkMap = new TreeMap<>();
-        int firstAppend = rnd.nextInt(strList.size() / 2);
-        int secondAppend = firstAppend + rnd.nextInt((strList.size() - 
firstAppend) / 2);
-        int appendIndex = 0;
-        int checkIndex = 0;
-
-        for (; appendIndex < firstAppend; appendIndex++) {
-            b.addValue(strList.get(appendIndex));
-        }
-        dict = b.build(0);
-        dict.dump(System.out);
-        for (; checkIndex < firstAppend; checkIndex++) {
-            String str = strList.get(checkIndex);
-            byte[] bytes = converter.convertToBytes(str);
-            int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, 
bytes.length, 0);
-            assertNotEquals(String.format("Value %s not exist", str), -1, id);
-            assertFalse(String.format("Id %d for %s should be empty, but is 
%s", id, str, checkMap.get(id)), checkMap.containsKey(id) && 
!str.equals(checkMap.get(id)));
-            checkMap.put(id, str);
-        }
-
-        // reopen dict and append
-//        b = AppendTrieDictionary.Builder.create(dict);
-        b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR, dict);
-        for (; appendIndex < secondAppend; appendIndex++) {
-            b.addValue(strList.get(appendIndex));
-        }
-        AppendTrieDictionary newDict = b.build(0);
-        assert newDict == dict;
-        dict = newDict;
-        dict.dump(System.out);
-        checkIndex = 0;
-        for (; checkIndex < secondAppend; checkIndex++) {
-            String str = strList.get(checkIndex);
-            byte[] bytes = converter.convertToBytes(str);
-            int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, 
bytes.length, 0);
-            assertNotEquals(String.format("Value %s not exist", str), -1, id);
-            if (checkIndex < firstAppend) {
-                assertEquals("Except id " + id + " for " + str + " but " + 
checkMap.get(id), str, checkMap.get(id));
-            } else {
-                // check second append str, should be new id
-                assertFalse(String.format("Id %d for %s should be empty, but 
is %s", id, str, checkMap.get(id)), checkMap.containsKey(id) && 
!str.equals(checkMap.get(id)));
-                checkMap.put(id, str);
-            }
-        }
-
-        // reopen dict and append rest str
-        b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR, dict);
-        for (; appendIndex < strList.size(); appendIndex++) {
-            b.addValue(strList.get(appendIndex));
-        }
-        newDict = b.build(0);
-        assert newDict == dict;
-        dict = newDict;
-        dict.dump(System.out);
-        checkIndex = 0;
-        for (; checkIndex < strList.size(); checkIndex++) {
-            String str = strList.get(checkIndex);
-            byte[] bytes = converter.convertToBytes(str);
-            int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, 
bytes.length, 0);
-            assertNotEquals(String.format("Value %s not exist", str), -1, id);
-            if (checkIndex < secondAppend) {
-                assertEquals("Except id " + id + " for " + str + " but " + 
checkMap.get(id), str, checkMap.get(id));
-            } else {
-                // check third append str, should be new id
-                assertFalse(String.format("Id %d for %s should be empty, but 
is %s", id, str, checkMap.get(id)), checkMap.containsKey(id) && 
!str.equals(checkMap.get(id)));
-                checkMap.put(id, str);
-            }
-        }
-        if (notfound != null) {
-            for (String s : notfound) {
-                byte[] bytes = converter.convertToBytes(s);
-                int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, 
bytes.length, 0);
-                assertEquals(-1, id);
-            }
-        }
-
-        dict = testSerialize(dict, converter);
-        for (String str : strList) {
-            byte[] bytes = converter.convertToBytes(str);
-            int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, 
bytes.length, 0);
-            assertNotEquals(String.format("Value %s not exist", str), -1, id);
-            assertEquals("Except id " + id + " for " + str + " but " + 
checkMap.get(id), str, checkMap.get(id));
-        }
-    }
-
-    private static AppendTrieDictionary<String> 
testSerialize(AppendTrieDictionary<String> dict, BytesConverter converter) {
-        try {
-            ByteArrayOutputStream bout = new ByteArrayOutputStream();
-            DataOutputStream dataout = new DataOutputStream(bout);
-            dict.write(dataout);
-            dataout.close();
-            ByteArrayInputStream bin = new 
ByteArrayInputStream(bout.toByteArray());
-            DataInputStream datain = new DataInputStream(bin);
-            AppendTrieDictionary<String> r = new 
AppendTrieDictionary<String>();
-            r.readFields(datain);
-            datain.close();
-            return r;
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Test
-    public void testMaxInteger() throws IOException {
-        AppendTrieDictionary.Builder<String> builder = 
AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR);
-        builder.setMaxId(Integer.MAX_VALUE - 2);
-        builder.addValue("a");
-        builder.addValue("ab");
-        builder.addValue("acd");
-        builder.addValue("ac");
-        AppendTrieDictionary dict = builder.build(0);
-        assertEquals(2147483646, dict.getIdFromValueImpl("a", 0));
-        assertEquals(2147483647, dict.getIdFromValueImpl("ab", 0));
-        assertEquals(-2147483647, dict.getIdFromValueImpl("ac", 0));
-        assertEquals(-2147483648, dict.getIdFromValueImpl("acd", 0));
-    }
-
-    @Ignore("Only occurred when value is very long (>8000 bytes)")
-    @Test
-    public void testSuperLongValue() throws IOException {
-        AppendTrieDictionary.Builder<String> builder = 
AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR);
-        String value = "a";
-        for (int i = 0; i < 10000; i++) {
-            value += "a";
-            try {
-                builder.addValue(value);
-            } catch (StackOverflowError e) {
-                System.out.println("\nstack overflow " + i);
-                throw e;
-            }
-        }
-        AppendTrieDictionary dictionary = builder.build(0);
-        dictionary.getMaxId();
-    }
-
-    private static class SharedBuilderThread extends Thread {
-        CountDownLatch startLatch;
-        CountDownLatch finishLatch;
-        String resourcePath;
-        String prefix;
-        int count;
-
-        SharedBuilderThread(CountDownLatch startLatch, CountDownLatch 
finishLatch, String resourcePath, String prefix, int count) {
-            this.startLatch = startLatch;
-            this.finishLatch = finishLatch;
-            this.resourcePath = resourcePath;
-            this.prefix = prefix;
-            this.count = count;
-        }
-
-        @Override
-        public void run() {
-            try {
-                AppendTrieDictionary.Builder<String> builder = 
AppendTrieDictionary.Builder.getInstance(resourcePath);
-                startLatch.countDown();
-                for (int i = 0; i < count; i++) {
-                    builder.addValue(prefix + i);
-                }
-                builder.build(0);
-                finishLatch.countDown();
-            } catch (IOException e) {}
-        }
-    }
-
-    @Test
-    public void testSharedBuilder() throws IOException, InterruptedException {
-        String resourcePath = "shared_builder";
-        final CountDownLatch startLatch = new CountDownLatch(3);
-        final CountDownLatch finishLatch = new CountDownLatch(3);
-
-        AppendTrieDictionary.Builder<String> builder = 
AppendTrieDictionary.Builder.getInstance(resourcePath);
-        Thread t1 = new SharedBuilderThread(startLatch, finishLatch, 
resourcePath, "t1_", 10000);
-        Thread t2 = new SharedBuilderThread(startLatch, finishLatch, 
resourcePath, "t2_", 10);
-        Thread t3 = new SharedBuilderThread(startLatch, finishLatch, 
resourcePath, "t3_", 100000);
-        t1.start();
-        t2.start();
-        t3.start();
-        startLatch.await();
-        AppendTrieDictionary dict = builder.build(0);
-        assertTrue("AppendDictBuilder Thread too slow", 
finishLatch.await(3000, TimeUnit.MILLISECONDS));
-        assertEquals(110010, dict.getMaxId());
-        try {
-            builder.addValue("fail");
-            fail("Builder should be closed");
-        } catch (Exception e) {}
-
-        builder = AppendTrieDictionary.Builder.getInstance(resourcePath, dict);
-        builder.addValue("success");
-        dict = builder.build(0);
-        for (int i = 0; i < 10000; i ++) {
-            assertNotEquals(-1, dict.getIdFromValue("t1_" + i));
-        }
-        for (int i = 0; i < 10; i ++) {
-            assertNotEquals(-1, dict.getIdFromValue("t2_" + i));
-        }
-        for (int i = 0; i < 100000; i ++) {
-            assertNotEquals(-1, dict.getIdFromValue("t3_" + i));
-        }
-        assertEquals(110011, dict.getIdFromValue("success"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java 
b/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java
deleted file mode 100644
index 3c29d9c..0000000
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.dict;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.util.UUID;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Test;
-
-/**
- * Created by sunyerui on 16/7/12.
- */
-public class CachedTreeMapTest {
-
-    public static class Key implements WritableComparable {
-        int keyInt;
-
-        public static Key of(int keyInt) {
-            Key newKey = new Key();
-            newKey.keyInt = keyInt;
-            return newKey;
-        }
-
-        @Override
-        public int compareTo(Object o) {
-            return keyInt - ((Key)o).keyInt;
-        }
-
-        @Override
-        public void write(DataOutput out) throws IOException {
-            out.writeInt(keyInt);
-        }
-
-        @Override
-        public void readFields(DataInput in) throws IOException {
-            keyInt = in.readInt();
-        }
-
-        @Override
-        public String toString() {
-            return String.valueOf(keyInt);
-        }
-    }
-
-    public static boolean VALUE_WRITE_ERROR_TOGGLE = false;
-    public static class Value implements Writable {
-        String valueStr;
-
-        public static Value of(String valueStr) {
-            Value newValue = new Value();
-            newValue.valueStr = valueStr;
-            return newValue;
-        }
-
-        @Override
-        public void write(DataOutput out) throws IOException {
-            if (VALUE_WRITE_ERROR_TOGGLE) {
-                out.write(new byte[0]);
-                return;
-            }
-            out.writeUTF(valueStr);
-        }
-
-        @Override
-        public void readFields(DataInput in) throws IOException {
-            valueStr = in.readUTF();
-        }
-    }
-
-    public static class CachedFileFilter implements FileFilter {
-        @Override
-        public boolean accept(File pathname) {
-            return pathname.getName().startsWith(CachedTreeMap.CACHED_PREFIX);
-        }
-    }
-
-    public static class VersionFilter implements FileFilter {
-        @Override
-        public boolean accept(File pathname) {
-            return pathname.getName().startsWith(CachedTreeMap.VERSION_PREFIX);
-        }
-    }
-
-
-    static final UUID uuid = UUID.randomUUID();
-    static final String baseDir = "/tmp/kylin_cachedtreemap_test/" + uuid;
-    static final String workingDir = baseDir + "/working";
-
-    private static void cleanup() {
-        Path basePath = new Path(baseDir);
-        try {
-            HadoopUtil.getFileSystem(basePath).delete(basePath, true);
-        } catch (IOException e) {}
-        VALUE_WRITE_ERROR_TOGGLE = false;
-    }
-
-    @After
-    public void afterTest() {
-        cleanup();
-    }
-
-    @AfterClass
-    public static void tearDown() {
-        cleanup();
-    }
-
-    @Test
-    public void testCachedTreeMap() throws IOException {
-        CachedTreeMap map = createMutableMap();
-        map.put(Key.of(1), Value.of("a"));
-        map.put(Key.of(2), Value.of("b"));
-        map.put(Key.of(3), Value.of("c"));
-        map.put(Key.of(4), Value.of("d"));
-        map.put(Key.of(5), Value.of("e"));
-
-        File dir = new File(workingDir);
-        assertEquals(3, dir.listFiles(new CachedFileFilter()).length);
-
-        flushAndCommit(map, true, true, false);
-        assertFalse(new File(workingDir).exists());
-
-        dir = new File(map.getLatestVersion());
-        assertEquals(5, dir.listFiles(new CachedFileFilter()).length);
-
-        CachedTreeMap map2 = createImmutableMap();
-        assertEquals(5, map2.size());
-        assertEquals("b", ((Value)map2.get(Key.of(2))).valueStr);
-
-        try {
-            map2.put(Key.of(6), Value.of("f"));
-            fail("Should be error when put value into immutable map");
-        } catch (AssertionError error) {}
-    }
-
-    @Test
-    public void testMultiVersions() throws IOException, InterruptedException {
-        CachedTreeMap map = createMutableMap();
-        Thread.sleep(3000);
-        map.put(Key.of(1), Value.of("a"));
-        map.put(Key.of(2), Value.of("b"));
-        map.put(Key.of(3), Value.of("c"));
-        flushAndCommit(map, true, true, false);
-
-        CachedTreeMap map2 = createImmutableMap();
-        assertEquals("b", ((Value)map2.get(Key.of(2))).valueStr);
-
-        // re-open dict, append new data
-        map = createMutableMap();
-        map.put(Key.of(4), Value.of("d"));
-        flushAndCommit(map, true, true, true);
-
-        // new data is not visible for map2
-        assertNull(map2.get(Key.of(4)));
-
-        // append data, and be visible for new immutable map
-        map.put(Key.of(5), Value.of("e"));
-        flushAndCommit(map, true, true, true);
-
-        CachedTreeMap map3 = createImmutableMap();
-        assertEquals("d", ((Value)map3.get(Key.of(4))).valueStr);
-        assertEquals("e", ((Value)map3.get(Key.of(5))).valueStr);
-
-        // Check versions retention
-        File dir = new File(baseDir);
-        assertEquals(3, dir.listFiles(new VersionFilter()).length);
-    }
-
-    @Test
-    public void testKeepAppend() throws IOException {
-        CachedTreeMap map = createMutableMap();
-        map.put(Key.of(1), Value.of("a"));
-        map.put(Key.of(2), Value.of("b"));
-        map.put(Key.of(3), Value.of("c"));
-        map.put(Key.of(4), Value.of("d"));
-        map.put(Key.of(5), Value.of("e"));
-
-        // flush with keepAppend false, map can't be append
-        flushAndCommit(map, true, true, false);
-        // append into map has closed
-        try {
-            map.put(Key.of(6), Value.of("f"));
-            fail();
-        } catch (AssertionError e) {
-            assertEquals("Only support put method with immutable false and 
keepAppend true", e.getMessage());
-        }
-
-        CachedTreeMap map2 = createImmutableMap();
-        assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr);
-        assertEquals("d", ((Value)map2.get(Key.of(4))).valueStr);
-        assertEquals("e", ((Value)map2.get(Key.of(5))).valueStr);
-
-        map = createMutableMap();
-        map.put(Key.of(6), Value.of("f"));
-        map.put(Key.of(7), Value.of("g"));
-        map.put(Key.of(8), Value.of("h"));
-        // flush with keepAppend true
-        flushAndCommit(map, true, true, true);
-        map.put(Key.of(9), Value.of("i"));
-        // can still append data
-        flushAndCommit(map, true, true, false);
-
-        map2 = createImmutableMap();
-        assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr);
-        assertEquals("d", ((Value)map2.get(Key.of(4))).valueStr);
-        assertEquals("f", ((Value)map2.get(Key.of(6))).valueStr);
-        assertEquals("i", ((Value)map2.get(Key.of(9))).valueStr);
-    }
-
-    @Test
-    public void testVersionRetention() throws IOException, 
InterruptedException {
-        File dir = new File(baseDir);
-        // TTL for 3s and keep 3 versions
-        CachedTreeMap map = 
CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir)
-            
.immutable(false).maxSize(2).keyClazz(Key.class).valueClazz(Value.class)
-            .maxVersions(3).versionTTL(1000 * 3).build();
-        map.put(Key.of(1), Value.of("a"));
-
-        // has version 0 when create map
-        assertEquals(1, dir.listFiles(new VersionFilter()).length);
-        Thread.sleep(2500);
-
-        // flush version 1
-        flushAndCommit(map, true, true, true);
-        assertEquals(2, dir.listFiles(new VersionFilter()).length);
-
-        // flush version 2
-        flushAndCommit(map, true, true, true);
-        assertEquals(3, dir.listFiles(new VersionFilter()).length);
-
-        // flush version 3
-        flushAndCommit(map, true, true, true);
-        // won't delete version since 3s TTL
-        assertEquals(4, dir.listFiles(new VersionFilter()).length);
-
-        // sleep to make version 0 expired
-        Thread.sleep(500);
-        // flush verion 4
-        flushAndCommit(map, true, true, false);
-        assertEquals(4, dir.listFiles(new VersionFilter()).length);
-
-        // TTL for 100ms and keep 2 versions
-        map = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir)
-            
.immutable(false).maxSize(2).keyClazz(Key.class).valueClazz(Value.class)
-            .maxVersions(2).versionTTL(100).build();
-        flushAndCommit(map, true, true, false);
-        assertEquals(2, dir.listFiles(new VersionFilter()).length);
-    }
-
-    @Test
-    public void testWithOldFormat() throws IOException {
-        File dir = new File(baseDir);
-        CachedTreeMap map = createMutableMap();
-        map.put(Key.of(1), Value.of("a"));
-        map.put(Key.of(2), Value.of("b"));
-        map.put(Key.of(3), Value.of("c"));
-        map.put(Key.of(4), Value.of("d"));
-        map.put(Key.of(5), Value.of("e"));
-        flushAndCommit(map, true, true, true);
-
-        // move version dir to base dir, to simulate the older format
-        Path versionPath = new Path(map.getLatestVersion());
-        Path tmpVersionPath = new Path(versionPath.getParent().getParent(), 
versionPath.getName());
-        FileSystem fs = HadoopUtil.getFileSystem(versionPath);
-        fs.rename(versionPath, tmpVersionPath);
-        fs.delete(new Path(baseDir), true);
-        fs.rename(tmpVersionPath, new Path(baseDir));
-        assertEquals(0, dir.listFiles(new VersionFilter()).length);
-        assertEquals(5, dir.listFiles(new CachedFileFilter()).length);
-
-        CachedTreeMap map2 = createImmutableMap();
-        assertEquals(5, map2.size());
-        assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr);
-        assertEquals("e", ((Value)map2.get(Key.of(5))).valueStr);
-
-        assertEquals(1, dir.listFiles(new VersionFilter()).length);
-        assertEquals(0, dir.listFiles(new CachedFileFilter()).length);
-    }
-
-    @Test
-    public void testWriteFailed() throws IOException {
-        // normal case
-        CachedTreeMap map = createMutableMap();
-        map.put(Key.of(1), Value.of("a"));
-        map.put(Key.of(2), Value.of("b"));
-        map.put(Key.of(3), Value.of("c"));
-        map.remove(Key.of(3));
-        map.put(Key.of(4), Value.of("d"));
-
-        flushAndCommit(map, true, true, false);
-
-        CachedTreeMap map2 = createImmutableMap();
-        assertEquals(3, map2.size());
-        assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr);
-
-        // suppose write value failed and didn't commit data
-        map = createMutableMap();
-        VALUE_WRITE_ERROR_TOGGLE = true;
-        map.put(Key.of(1), Value.of("aa"));
-        map.put(Key.of(2), Value.of("bb"));
-        VALUE_WRITE_ERROR_TOGGLE = false;
-        map.put(Key.of(3), Value.of("cc"));
-        map.put(Key.of(4), Value.of("dd"));
-        // suppose write value failed and didn't commit data
-        flushAndCommit(map, true, false, false);
-
-        // read map data should not be modified
-        map2 = createImmutableMap();
-        assertEquals(3, map2.size());
-        assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr);
-
-        assertTrue(new File(workingDir).exists());
-    }
-
-    private CachedTreeMap createImmutableMap() throws IOException {
-        CachedTreeMap map = 
CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir)
-            
.immutable(true).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build();
-        try (DataInputStream in = map.openIndexInput()) {
-            map.readFields(in);
-        }
-        return map;
-    }
-
-    private CachedTreeMap createMutableMap() throws IOException {
-        CachedTreeMap map = 
CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir)
-            .immutable(false).maxSize(2).maxVersions(3).versionTTL(1000 * 
3).keyClazz(Key.class).valueClazz(Value.class).build();
-        try (DataInputStream in = map.openIndexInput()) {
-            map.readFields(in);
-        } catch (IOException e) {}
-        return map;
-    }
-
-    private void flushAndCommit(CachedTreeMap map, boolean doFlush, boolean 
doCommit, boolean keepAppend) throws IOException {
-        if (doFlush) {
-            try (DataOutputStream out = map.openIndexOutput()) {
-                map.write(out);
-            }
-        }
-
-        if (doCommit) {
-            map.commit(keepAppend);
-        }
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/core-dictionary/src/test/java/org/apache/kylin/dict/global/AppendTrieDictionaryTest.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/test/java/org/apache/kylin/dict/global/AppendTrieDictionaryTest.java
 
b/core-dictionary/src/test/java/org/apache/kylin/dict/global/AppendTrieDictionaryTest.java
new file mode 100644
index 0000000..f6ba75a
--- /dev/null
+++ 
b/core-dictionary/src/test/java/org/apache/kylin/dict/global/AppendTrieDictionaryTest.java
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.global;
+
+import static org.apache.kylin.dict.global.GlobalDictHDFSStore.V2_INDEX_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.dict.AppendTrieDictionary;
+import org.apache.kylin.dict.BytesConverter;
+import org.apache.kylin.dict.StringBytesConverter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
+    private static final String RESOURCE_DIR = "/dict/append_dict_test/" + 
UUID.randomUUID();
+    private static String BASE_DIR;
+    private static String LOCAL_BASE_DIR = 
"/tmp/kylin/kylin_metadata/resources/GlobalDict" + RESOURCE_DIR + "/";
+
+    @Before
+    public void beforeTest() {
+        staticCreateTestMetadata();
+        
KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size",
 "50000");
+        BASE_DIR = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() 
+ "/resources/GlobalDict" + RESOURCE_DIR + "/";
+    }
+
+    @After
+    public void afterTest() {
+        cleanup();
+        staticCleanupTestMetadata();
+    }
+
+    private void cleanup() {
+        Path basePath = new Path(BASE_DIR);
+        try {
+            HadoopUtil.getFileSystem(basePath).delete(basePath, true);
+        } catch (IOException e) {
+        }
+    }
+
+    private static final String[] words = new String[] { "paint", "par", 
"part", "parts", "partition", "partitions", "party", "partie", "parties", 
"patient", "taste", "tar", "trie", "try", "tries", "字典", "字典树", 
"字母", // non-ascii characters
+            "", // empty
+            
"paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii",
 
"paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiipaiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii",
+            
"paintjkjdfklajkdljfkdsajklfjklsadjkjekjrklewjrklewjklrjklewjkljkljkljkljweklrjewkljrklewjrlkjewkljrkljkljkjlkjjkljkljkljkljlkjlkjlkjljdfadfads"
 + 
"dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk"
 + 
"dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk"
 + 
"dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk"
 + 
"dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk"
 + 
"dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk"
+                    + 
"dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk"
 + 
"dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk",
+            "paint", "tar", "try", // some dup
+    };
+
+    private AppendTrieDictionaryBuilder createBuilder(String resourceDir) 
throws IOException {
+        int maxEntriesPerSlice = 
KylinConfig.getInstanceFromEnv().getAppendDictEntrySize();
+        return new AppendTrieDictionaryBuilder(resourceDir, 
maxEntriesPerSlice);
+    }
+
+    @Test
+    public void testStringRepeatly() throws IOException {
+        ArrayList<String> list = new ArrayList<>();
+        Collections.addAll(list, words);
+        ArrayList<String> notfound = new ArrayList<>();
+        notfound.add("pa");
+        notfound.add("pars");
+        notfound.add("tri");
+        notfound.add("字");
+        for (int i = 0; i < 50; i++) {
+            testStringDictAppend(list, notfound, true);
+            //to speed up the test
+            cleanup();
+        }
+    }
+
+    @Test
+    public void testEnglishWords() throws Exception {
+        InputStream is = new 
FileInputStream("src/test/resources/dict/english-words.80 
(scowl-2015.05.18).txt");
+        ArrayList<String> str = loadStrings(is);
+        testStringDictAppend(str, null, false);
+    }
+
+    @Test
+    public void testCategoryNames() throws Exception {
+        InputStream is = new 
FileInputStream("src/test/resources/dict/dw_category_grouping_names.dat");
+        ArrayList<String> str = loadStrings(is);
+        testStringDictAppend(str, null, true);
+    }
+
+    private static ArrayList<String> loadStrings(InputStream is) throws 
Exception {
+        ArrayList<String> r = new ArrayList<String>();
+        BufferedReader reader = new BufferedReader(new InputStreamReader(is, 
"UTF-8"));
+        try {
+            String word;
+            while ((word = reader.readLine()) != null) {
+                word = word.trim();
+                if (word.isEmpty() == false)
+                    r.add(word);
+            }
+        } finally {
+            reader.close();
+            is.close();
+        }
+        return r;
+    }
+
+    @Ignore("need huge key set")
+    @Test
+    public void testHugeKeySet() throws IOException {
+        AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+
+        AppendTrieDictionary<String> dict = null;
+
+        InputStream is = new 
FileInputStream("src/test/resources/dict/huge_key");
+        BufferedReader reader = new BufferedReader(new InputStreamReader(is, 
"UTF-8"));
+        try {
+            String word;
+            while ((word = reader.readLine()) != null) {
+                word = word.trim();
+                if (!word.isEmpty())
+                    builder.addValue(word);
+            }
+        } finally {
+            reader.close();
+            is.close();
+        }
+        dict = builder.build(0);
+        dict.dump(System.out);
+    }
+
+    private void testStringDictAppend(ArrayList<String> list, 
ArrayList<String> notfound, boolean shuffleList) throws IOException {
+        Random rnd = new Random(System.currentTimeMillis());
+        ArrayList<String> strList = new ArrayList<String>();
+        strList.addAll(list);
+        if (shuffleList) {
+            Collections.shuffle(strList, rnd);
+        }
+        BytesConverter converter = new StringBytesConverter();
+
+        AppendTrieDictionaryBuilder b = createBuilder(RESOURCE_DIR);
+
+        TreeMap<Integer, String> checkMap = new TreeMap<>();
+        int firstAppend = rnd.nextInt(strList.size() / 2);
+        int secondAppend = firstAppend + rnd.nextInt((strList.size() - 
firstAppend) / 2);
+        int appendIndex = 0;
+        int checkIndex = 0;
+
+        for (; appendIndex < firstAppend; appendIndex++) {
+            b.addValue(strList.get(appendIndex));
+        }
+        AppendTrieDictionary<String> dict = b.build(0);
+        dict.dump(System.out);
+        for (; checkIndex < firstAppend; checkIndex++) {
+            String str = strList.get(checkIndex);
+            byte[] bytes = converter.convertToBytes(str);
+            int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, 
bytes.length, 0);
+            assertNotEquals(String.format("Value %s not exist", str), -1, id);
+            assertFalse(String.format("Id %d for %s should be empty, but is 
%s", id, str, checkMap.get(id)), checkMap.containsKey(id) && 
!str.equals(checkMap.get(id)));
+            checkMap.put(id, str);
+        }
+
+        // reopen dict and append
+        b = createBuilder(RESOURCE_DIR);
+
+        for (; appendIndex < secondAppend; appendIndex++) {
+            b.addValue(strList.get(appendIndex));
+        }
+        AppendTrieDictionary<String> newDict = b.build(0);
+        assert newDict.equals(dict);
+        dict = newDict;
+        dict.dump(System.out);
+        checkIndex = 0;
+        for (; checkIndex < secondAppend; checkIndex++) {
+            String str = strList.get(checkIndex);
+            byte[] bytes = converter.convertToBytes(str);
+            int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, 
bytes.length, 0);
+            assertNotEquals(String.format("Value %s not exist", str), -1, id);
+            if (checkIndex < firstAppend) {
+                assertEquals("Except id " + id + " for " + str + " but " + 
checkMap.get(id), str, checkMap.get(id));
+            } else {
+                // check second append str, should be new id
+                assertFalse(String.format("Id %d for %s should be empty, but 
is %s", id, str, checkMap.get(id)), checkMap.containsKey(id) && 
!str.equals(checkMap.get(id)));
+                checkMap.put(id, str);
+            }
+        }
+
+        // reopen dict and append rest str
+        b = createBuilder(RESOURCE_DIR);
+
+        for (; appendIndex < strList.size(); appendIndex++) {
+            b.addValue(strList.get(appendIndex));
+        }
+        newDict = b.build(0);
+        assert newDict.equals(dict);
+        dict = newDict;
+        dict.dump(System.out);
+        checkIndex = 0;
+        for (; checkIndex < strList.size(); checkIndex++) {
+            String str = strList.get(checkIndex);
+            byte[] bytes = converter.convertToBytes(str);
+            int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, 
bytes.length, 0);
+            assertNotEquals(String.format("Value %s not exist", str), -1, id);
+            if (checkIndex < secondAppend) {
+                assertEquals("Except id " + id + " for " + str + " but " + 
checkMap.get(id), str, checkMap.get(id));
+            } else {
+                // check third append str, should be new id
+                assertFalse(String.format("Id %d for %s should be empty, but 
is %s", id, str, checkMap.get(id)), checkMap.containsKey(id) && 
!str.equals(checkMap.get(id)));
+                checkMap.put(id, str);
+            }
+        }
+        if (notfound != null) {
+            for (String s : notfound) {
+                byte[] bytes = converter.convertToBytes(s);
+                int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, 
bytes.length, 0);
+                assertEquals(-1, id);
+            }
+        }
+
+        dict = testSerialize(dict, converter);
+        for (String str : strList) {
+            byte[] bytes = converter.convertToBytes(str);
+            int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, 
bytes.length, 0);
+            assertNotEquals(String.format("Value %s not exist", str), -1, id);
+            assertEquals("Except id " + id + " for " + str + " but " + 
checkMap.get(id), str, checkMap.get(id));
+        }
+    }
+
+    private static AppendTrieDictionary<String> 
testSerialize(AppendTrieDictionary<String> dict, BytesConverter converter) {
+        try {
+            ByteArrayOutputStream bout = new ByteArrayOutputStream();
+            DataOutputStream dataout = new DataOutputStream(bout);
+            dict.write(dataout);
+            dataout.close();
+            ByteArrayInputStream bin = new 
ByteArrayInputStream(bout.toByteArray());
+            DataInputStream datain = new DataInputStream(bin);
+            AppendTrieDictionary<String> r = new 
AppendTrieDictionary<String>();
+            r.readFields(datain);
+            datain.close();
+            return r;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void testMaxInteger() throws IOException {
+        AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+        builder.setMaxId(Integer.MAX_VALUE - 2);
+        builder.addValue("a");
+        builder.addValue("ab");
+        builder.addValue("acd");
+        builder.addValue("ac");
+        AppendTrieDictionary dict = builder.build(0);
+        assertEquals(2147483646, dict.getIdFromValue("a", 0));
+        assertEquals(2147483647, dict.getIdFromValue("ab", 0));
+        assertEquals(-2147483647, dict.getIdFromValue("ac", 0));
+        assertEquals(-2147483648, dict.getIdFromValue("acd", 0));
+    }
+
+    @Ignore("Only occurred when value is very long (>8000 bytes)")
+    @Test
+    public void testSuperLongValue() throws IOException {
+        AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+        String value = "a";
+        for (int i = 0; i < 10000; i++) {
+            value += "a";
+            try {
+                builder.addValue(value);
+            } catch (StackOverflowError e) {
+                System.out.println("\nstack overflow " + i);
+                throw e;
+            }
+        }
+        AppendTrieDictionary dictionary = builder.build(0);
+        dictionary.getMaxId();
+    }
+
+    @Test
+    public void testSplitContainSuperLongValue() throws IOException {
+        String superLongValue = 
"%5Cx1A%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%7E%29%5CxEF%5CxBF%5CxBD%5Cx1B+%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5Cx13%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5B";
+
+        createAppendTrieDict(Arrays.asList("a", superLongValue));
+    }
+
+    @Test
+    public void testSuperLongValueAsFileName() throws IOException {
+        String superLongValue = 
"%5Cx1A%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%7E%29%5CxEF%5CxBF%5CxBD%5Cx1B+%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5Cx13%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5B";
+
+        createAppendTrieDict(Arrays.asList("a", superLongValue));
+    }
+
+    @Test
+    public void testIllegalFileNameValue() throws IOException {
+        createAppendTrieDict(Arrays.asList("::", ":"));
+    }
+
+    @Test
+    public void testSkipAddValue() throws IOException {
+        createAppendTrieDict(new ArrayList<String>());
+    }
+
+    private void createAppendTrieDict(List<String> valueList) throws 
IOException {
+        
KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size",
 "1");
+
+        AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+
+        for (String value : valueList) {
+            builder.addValue(value);
+        }
+
+        builder.build(0);
+    }
+
+    private static class CachedFileFilter implements FileFilter {
+        @Override
+        public boolean accept(File pathname) {
+            return pathname.getName().startsWith("cached_");
+        }
+    }
+
+    private static class VersionFilter implements FileFilter {
+        @Override
+        public boolean accept(File pathname) {
+            return 
pathname.getName().startsWith(GlobalDictHDFSStore.VERSION_PREFIX);
+        }
+    }
+
+    @Test
+    public void testMultiVersions() throws IOException, InterruptedException {
+        
KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size",
 "4");
+
+        AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+        builder.addValue("a");
+        builder.addValue("b");
+        builder.addValue("c");
+        builder.addValue("d");
+        builder.addValue("e");
+        builder.addValue("f");
+        AppendTrieDictionary dict = builder.build(0);
+
+        assertEquals(2, dict.getIdFromValue("b"));
+
+        // re-open dict, append new data
+        builder = createBuilder(RESOURCE_DIR);
+        builder.addValue("g");
+
+        // new data is not visible
+        try {
+            dict.getIdFromValue("g");
+            fail("Value 'g' (g) not exists!");
+        } catch (IllegalArgumentException e) {
+
+        }
+
+        // append data, and be visible for new immutable map
+        builder.addValue("h");
+
+        AppendTrieDictionary newDict = builder.build(0);
+        assert newDict.equals(dict);
+
+        assertEquals(7, newDict.getIdFromValue("g"));
+        assertEquals(8, newDict.getIdFromValue("h"));
+
+        // Check versions retention
+        File dir = new File(LOCAL_BASE_DIR);
+        assertEquals(2, dir.listFiles(new VersionFilter()).length);
+    }
+
+    @Test
+    public void testVersionRetention() throws IOException, 
InterruptedException {
+        
KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size",
 "4");
+        
KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-max-versions",
 "1");
+        
KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-version-ttl",
 "1000");
+
+        AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+        builder.addValue("a");
+
+        //version 1
+        builder.build(0);
+
+        // Check versions retention
+        File dir = new File(LOCAL_BASE_DIR);
+        assertEquals(1, dir.listFiles(new VersionFilter()).length);
+
+        // sleep to make version 1 expired
+        Thread.sleep(1200);
+
+        //version 2
+        builder = createBuilder(RESOURCE_DIR);
+        builder.addValue("");
+        builder.build(0);
+
+        // Check versions retention
+        assertEquals(1, dir.listFiles(new VersionFilter()).length);
+    }
+
+    @Test
+    public void testOldDirFormat() throws IOException {
+        
KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size",
 "4");
+
+        AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+        builder.addValue("a");
+        builder.addValue("b");
+        builder.addValue("c");
+        builder.addValue("d");
+        builder.addValue("e");
+        builder.addValue("f");
+        builder.build(0);
+
+        convertDirToOldFormat(BASE_DIR);
+
+        File dir = new File(LOCAL_BASE_DIR);
+        assertEquals(0, dir.listFiles(new VersionFilter()).length);
+        assertEquals(3, dir.listFiles(new CachedFileFilter()).length);
+
+        //convert older format to new format when builder init
+        builder = createBuilder(RESOURCE_DIR);
+        builder.build(0);
+
+        assertEquals(1, dir.listFiles(new VersionFilter()).length);
+    }
+
+    private void convertDirToOldFormat(String baseDir) throws IOException {
+        Path basePath = new Path(baseDir);
+        FileSystem fs = HadoopUtil.getFileSystem(basePath);
+
+        // move version dir to base dir, to simulate the older format
+        GlobalDictHDFSStore store = new GlobalDictHDFSStore(baseDir);
+        Long[] versions = store.listAllVersions();
+        Path versionPath = store.getVersionDir(versions[versions.length - 1]);
+        Path tmpVersionPath = new Path(versionPath.getParent().getParent(), 
versionPath.getName());
+        fs.rename(versionPath, tmpVersionPath);
+        fs.delete(new Path(baseDir), true);
+        fs.rename(tmpVersionPath, new Path(baseDir));
+    }
+
+    @Test
+    public void testOldIndexFormat() throws IOException {
+        
KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size",
 "4");
+
+        AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+        builder.addValue("a");
+        builder.addValue("b");
+        builder.addValue("c");
+        builder.addValue("d");
+        builder.addValue("e");
+        builder.addValue("f");
+        builder.build(0);
+
+        convertIndexToOldFormat(BASE_DIR);
+
+        builder = createBuilder(RESOURCE_DIR);
+        builder.addValue("g");
+        builder.addValue("h");
+        builder.addValue("i");
+        AppendTrieDictionary dict = builder.build(0);
+
+        assertEquals(1, dict.getIdFromValue("a"));
+        assertEquals(7, dict.getIdFromValue("g"));
+    }
+
+    private void convertIndexToOldFormat(String baseDir) throws IOException {
+        Path basePath = new Path(baseDir);
+        FileSystem fs = HadoopUtil.getFileSystem(basePath);
+
+        GlobalDictHDFSStore store = new GlobalDictHDFSStore(baseDir);
+        Long[] versions = store.listAllVersions();
+        GlobalDictMetadata metadata = 
store.getMetadata(versions[versions.length - 1]);
+
+        //convert v2 index to v1 index
+        Path versionPath = store.getVersionDir(versions[versions.length - 1]);
+        Path v2IndexFile = new Path(versionPath, V2_INDEX_NAME);
+
+        fs.delete(v2IndexFile, true);
+        GlobalDictHDFSStore.IndexFormat indexFormatV1 = new 
GlobalDictHDFSStore.IndexFormatV1(fs, HadoopUtil.getCurrentConfiguration());
+        indexFormatV1.writeIndexFile(versionPath, metadata);
+
+        //convert v2 fileName format to v1 fileName format
+        for (Map.Entry<AppendDictSliceKey, String> entry : 
metadata.sliceFileMap.entrySet()) {
+            fs.rename(new Path(versionPath, entry.getValue()), new 
Path(versionPath, "cached_" + entry.getKey()));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 403abc4..8b6b5aa 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -187,8 +187,8 @@ public class DefaultScheduler implements 
Scheduler<AbstractExecutable>, Connecti
     }
 
     @Override
-    public synchronized void init(JobEngineConfig jobEngineConfig, final 
JobLock jobLock) throws SchedulerException {
-        this.jobLock = jobLock;
+    public synchronized void init(JobEngineConfig jobEngineConfig, JobLock 
lock) throws SchedulerException {
+        jobLock = lock;
         
         String serverMode = jobEngineConfig.getConfig().getServerMode();
         if (!("job".equals(serverMode.toLowerCase()) || 
"all".equals(serverMode.toLowerCase()))) {
@@ -205,7 +205,7 @@ public class DefaultScheduler implements 
Scheduler<AbstractExecutable>, Connecti
 
         this.jobEngineConfig = jobEngineConfig;
 
-        if (jobLock.lock() == false) {
+        if (jobLock.lockJobEngine() == false) {
             throw new IllegalStateException("Cannot start job scheduler due to 
lack of job lock");
         }
 
@@ -226,7 +226,7 @@ public class DefaultScheduler implements 
Scheduler<AbstractExecutable>, Connecti
     @Override
     public void shutdown() throws SchedulerException {
         logger.info("Shutingdown Job Engine ....");
-        jobLock.unlock();
+        jobLock.unlockJobEngine();
         fetcherPool.shutdown();
         jobPool.shutdown();
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 1f2e958..83f8964 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.job.impl.threadpool;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Map;
@@ -36,6 +38,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.lock.DistributedLock;
 import org.apache.kylin.common.util.SetThreadName;
 import org.apache.kylin.job.Scheduler;
 import org.apache.kylin.job.constant.ExecutableConstants;
@@ -70,7 +73,8 @@ public class DistributedScheduler implements 
Scheduler<AbstractExecutable>, Conn
     private ExecutorService watchPool;
     private ExecutorService jobPool;
     private DefaultContext context;
-    private DistributedJobLock jobLock;
+    private DistributedLock jobLock;
+    private Closeable lockWatch;
 
     private static final Logger logger = 
LoggerFactory.getLogger(DistributedScheduler.class);
     private static final ConcurrentMap<KylinConfig, DistributedScheduler> 
CACHE = new ConcurrentHashMap<KylinConfig, DistributedScheduler>();
@@ -81,6 +85,7 @@ public class DistributedScheduler implements 
Scheduler<AbstractExecutable>, Conn
     private JobEngineConfig jobEngineConfig;
 
     private final static String SEGMENT_ID = "segmentId";
+    public static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
 
     //only for it test
     public static DistributedScheduler getInstance(KylinConfig config) {
@@ -149,7 +154,7 @@ public class DistributedScheduler implements 
Scheduler<AbstractExecutable>, Conn
 
     private String serverName = getServerName();
 
-    private String getServerName() {
+    public String getServerName() {
         String serverName = null;
         try {
             serverName = InetAddress.getLocalHost().getHostName();
@@ -177,7 +182,7 @@ public class DistributedScheduler implements 
Scheduler<AbstractExecutable>, Conn
         public void run() {
             try (SetThreadName ignored = new SetThreadName("Job %s", 
executable.getId())) {
                 String segmentId = executable.getParam(SEGMENT_ID);
-                if (jobLock.lockWithName(segmentId, serverName)) {
+                if (jobLock.lockPath(getLockPath(segmentId), serverName)) {
                     logger.info(executable.toString() + " scheduled in server: 
" + serverName);
 
                     context.addRunningJob(executable);
@@ -205,7 +210,7 @@ public class DistributedScheduler implements 
Scheduler<AbstractExecutable>, Conn
                 if (state != ExecutableState.READY && state != 
ExecutableState.RUNNING) {
                     if (segmentWithLocks.contains(segmentId)) {
                         logger.info(executable.toString() + " will release the 
lock for the segment: " + segmentId);
-                        jobLock.unlockWithName(segmentId);
+                        jobLock.unlockPath(getLockPath(segmentId));
                         segmentWithLocks.remove(segmentId);
                     }
                 }
@@ -214,15 +219,15 @@ public class DistributedScheduler implements 
Scheduler<AbstractExecutable>, Conn
     }
 
     //when the segment lock released but the segment related job still 
running, resume the job.
-    private class DoWatchImpl implements 
org.apache.kylin.job.lock.DistributedJobLock.DoWatchLock {
+    private class WatcherProcessImpl implements 
org.apache.kylin.common.lock.DistributedLock.Watcher {
         private String serverName;
 
-        public DoWatchImpl(String serverName) {
+        public WatcherProcessImpl(String serverName) {
             this.serverName = serverName;
         }
 
         @Override
-        public void doWatch(String path, String nodeData) {
+        public void process(String path, String nodeData) {
             String[] paths = path.split("/");
             String segmentId = paths[paths.length - 1];
 
@@ -233,7 +238,7 @@ public class DistributedScheduler implements 
Scheduler<AbstractExecutable>, Conn
                     if (executable instanceof DefaultChainedExecutable && 
executable.getParams().get(SEGMENT_ID).equalsIgnoreCase(segmentId) && 
!nodeData.equalsIgnoreCase(serverName)) {
                         try {
                             logger.warn(nodeData + " has released the lock 
for: " + segmentId + " but the job still running. so " + serverName + " resume 
the job");
-                            if (!jobLock.isHasLocked(segmentId)) {
+                            if (!jobLock.isPathLocked(getLockPath(segmentId))) 
{
                                 
executableManager.resumeRunningJobForce(executable.getId());
                                 fetcherPool.schedule(fetcher, 0, 
TimeUnit.SECONDS);
                                 break;
@@ -260,7 +265,7 @@ public class DistributedScheduler implements 
Scheduler<AbstractExecutable>, Conn
     }
 
     @Override
-    public synchronized void init(JobEngineConfig jobEngineConfig, final 
JobLock jobLock) throws SchedulerException {
+    public synchronized void init(JobEngineConfig jobEngineConfig, JobLock 
jobLock) throws SchedulerException {
         String serverMode = jobEngineConfig.getConfig().getServerMode();
         if (!("job".equals(serverMode.toLowerCase()) || 
"all".equals(serverMode.toLowerCase()))) {
             logger.info("server mode: " + serverMode + ", no need to run job 
scheduler");
@@ -283,18 +288,18 @@ public class DistributedScheduler implements 
Scheduler<AbstractExecutable>, Conn
 
         //watch the zookeeper node change, so that when one job server is 
down, other job servers can take over.
         watchPool = Executors.newFixedThreadPool(1);
-        DoWatchImpl doWatchImpl = new DoWatchImpl(this.serverName);
-        this.jobLock.watchLock(watchPool, doWatchImpl);
+        WatcherProcessImpl watcherProcess = new 
WatcherProcessImpl(this.serverName);
+        lockWatch = this.jobLock.watchPath(getWatchPath(), watchPool, 
watcherProcess);
 
         int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit();
         jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, 
Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue<Runnable>());
         context = new DefaultContext(Maps.<String, Executable> 
newConcurrentMap(), jobEngineConfig.getConfig());
 
-        resumeAllRunningJobs();
-
         fetcher = new FetcherRunner();
         fetcherPool.scheduleAtFixedRate(fetcher, 10, 
ExecutableConstants.DEFAULT_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS);
         hasStarted = true;
+
+        resumeAllRunningJobs();
     }
 
     private void resumeAllRunningJobs() {
@@ -303,7 +308,7 @@ public class DistributedScheduler implements 
Scheduler<AbstractExecutable>, Conn
             AbstractExecutable executable = executableManager.getJob(id);
             if (output.getState() == ExecutableState.RUNNING && executable 
instanceof DefaultChainedExecutable) {
                 try {
-                    if (!jobLock.isHasLocked(executable.getParam(SEGMENT_ID))) 
{
+                    if 
(!jobLock.isPathLocked(executable.getParam(SEGMENT_ID))) {
                         
executableManager.resumeRunningJobForce(executable.getId());
                         fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
                     }
@@ -314,16 +319,27 @@ public class DistributedScheduler implements 
Scheduler<AbstractExecutable>, Conn
         }
     }
 
+    public String getLockPath(String pathName) {
+        return ZOOKEEPER_LOCK_PATH + "/" + 
KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName;
+    }
+
+    private String getWatchPath() {
+        return ZOOKEEPER_LOCK_PATH + "/" + 
KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
+    }
+
     @Override
     public void shutdown() throws SchedulerException {
         logger.info("Will shut down Job Engine ....");
 
+        try {
+            lockWatch.close();
+        } catch (IOException e) {
+            throw new SchedulerException(e);
+        }
+
         releaseAllLocks();
         logger.info("The all locks has released");
 
-        watchPool.shutdown();
-        logger.info("The watchPool has down");
-
         fetcherPool.shutdown();
         logger.info("The fetcherPool has down");
 
@@ -333,7 +349,7 @@ public class DistributedScheduler implements 
Scheduler<AbstractExecutable>, Conn
 
     private void releaseAllLocks() {
         for (String segmentId : segmentWithLocks) {
-            jobLock.unlockWithName(segmentId);
+            jobLock.unlockPath(getLockPath(segmentId));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java 
b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
index 1c173ec..e5e2a1e 100644
--- a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
@@ -18,19 +18,7 @@
 
 package org.apache.kylin.job.lock;
 
-import java.util.concurrent.ExecutorService;
+import org.apache.kylin.common.lock.DistributedLock;
 
-public interface DistributedJobLock extends JobLock {
-    
-    boolean lockWithName(String name, String serverName);
-
-    boolean isHasLocked(String segmentId);
-
-    void unlockWithName(String name);
-
-    void watchLock(ExecutorService pool, DoWatchLock doWatch);
-    
-    public interface DoWatchLock {
-        void doWatch(String path, String data);
-    }
+public interface DistributedJobLock extends JobLock, DistributedLock {
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java 
b/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
index bbfb801..1b6b29e 100644
--- a/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
@@ -19,9 +19,12 @@
 package org.apache.kylin.job.lock;
 
 /**
+ * Among a Kylin cluster, usually only one node runs as the job engine and 
does the scheduling of build jobs.
+ * This interface is for such negotiation. 
  */
 public interface JobLock {
-    boolean lock();
+    
+    boolean lockJobEngine();
 
-    void unlock();
+    void unlockJobEngine();
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java 
b/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
index cac17b9..73f6192 100644
--- a/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
@@ -22,12 +22,12 @@ package org.apache.kylin.job.lock;
  */
 public class MockJobLock implements JobLock {
     @Override
-    public boolean lock() {
+    public boolean lockJobEngine() {
         return true;
     }
 
     @Override
-    public void unlock() {
+    public void unlockJobEngine() {
         return;
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
index daa1053..08a8cb0 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
@@ -156,9 +156,6 @@ public class KylinKryoRegistrator implements 
KryoRegistrator {
         kyroClasses.add(org.apache.kylin.cube.model.SelectRule.class);
         kyroClasses.add(org.apache.kylin.cube.model.v1_4_0.CubeDesc.class);
         kyroClasses.add(org.apache.kylin.dict.AppendTrieDictionary.class);
-        
kyroClasses.add(org.apache.kylin.dict.AppendTrieDictionary.DictNode.class);
-        
kyroClasses.add(org.apache.kylin.dict.AppendTrieDictionary.DictSlice.class);
-        
kyroClasses.add(org.apache.kylin.dict.AppendTrieDictionary.DictSliceKey.class);
         kyroClasses.add(org.apache.kylin.dict.CacheDictionary.class);
         kyroClasses.add(org.apache.kylin.dict.DateStrDictionary.class);
         kyroClasses.add(org.apache.kylin.dict.DictionaryInfo.class);

http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/examples/test_case_data/localmeta/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kylin.properties 
b/examples/test_case_data/localmeta/kylin.properties
index 9f7b24c..969e466 100644
--- a/examples/test_case_data/localmeta/kylin.properties
+++ b/examples/test_case_data/localmeta/kylin.properties
@@ -136,4 +136,4 @@ kylin.engine.mr.config-override.test2=test2
 kylin.job.lock=org.apache.kylin.job.lock.MockJobLock
 
 kylin.engine.provider.0=org.apache.kylin.engine.mr.MRBatchCubingEngine
-kylin.engine.provider.2=org.apache.kylin.engine.mr.MRBatchCubingEngine2
+kylin.engine.provider.2=org.apache.kylin.engine.mr.MRBatchCubingEngine2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java
 
b/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java
new file mode 100644
index 0000000..df2ebf7
--- /dev/null
+++ 
b/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class ITGlobalDictionaryBuilderTest extends HBaseMetadataTestCase {
+    private DictionaryInfo dictionaryInfo;
+
+    @Before
+    public void beforeTest() throws Exception {
+        staticCreateTestMetadata();
+        dictionaryInfo = new DictionaryInfo("testTable", "testColumn", 0, 
"String", null);
+    }
+
+    @After
+    public void afterTest() {
+        cleanup();
+        staticCleanupTestMetadata();
+    }
+
+    private void cleanup() {
+        String BASE_DIR = 
KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + 
"/resources/GlobalDict" + dictionaryInfo.getResourceDir() + "/";
+        Path basePath = new Path(BASE_DIR);
+        try {
+            HadoopUtil.getFileSystem(basePath).delete(basePath, true);
+        } catch (IOException e) {
+        }
+    }
+
+    @Test
+    public void testGlobalDictLock() throws IOException, InterruptedException {
+        final CountDownLatch startLatch = new CountDownLatch(3);
+        final CountDownLatch finishLatch = new CountDownLatch(3);
+
+        Thread t1 = new SharedBuilderThread(startLatch, finishLatch, "t1_", 
10000);
+        Thread t2 = new SharedBuilderThread(startLatch, finishLatch, "t2_", 
10);
+        Thread t3 = new SharedBuilderThread(startLatch, finishLatch, "t3_", 
100000);
+        t1.start();
+        t2.start();
+        t3.start();
+        startLatch.await();
+        finishLatch.await();
+
+        GlobalDictionaryBuilder builder = new GlobalDictionaryBuilder();
+        builder.init(dictionaryInfo, 0);
+        builder.addValue("success");
+        Dictionary<String> dict = builder.build();
+
+        for (int i = 0; i < 10000; i++) {
+            assertNotEquals(-1, dict.getIdFromValue("t1_" + i));
+        }
+        for (int i = 0; i < 10; i++) {
+            assertNotEquals(-1, dict.getIdFromValue("t2_" + i));
+        }
+        for (int i = 0; i < 100000; i++) {
+            assertNotEquals(-1, dict.getIdFromValue("t3_" + i));
+        }
+
+        assertEquals(110011, dict.getIdFromValue("success"));
+    }
+
+    private class SharedBuilderThread extends Thread {
+        CountDownLatch startLatch;
+        CountDownLatch finishLatch;
+        String prefix;
+        int count;
+
+        SharedBuilderThread(CountDownLatch startLatch, CountDownLatch 
finishLatch, String prefix, int count) {
+            this.startLatch = startLatch;
+            this.finishLatch = finishLatch;
+            this.prefix = prefix;
+            this.count = count;
+        }
+
+        @Override
+        public void run() {
+            try {
+                GlobalDictionaryBuilder builder = new 
GlobalDictionaryBuilder();
+                startLatch.countDown();
+
+                builder.init(dictionaryInfo, 0);
+                for (int i = 0; i < count; i++) {
+                    builder.addValue(prefix + i);
+                }
+                builder.build();
+                finishLatch.countDown();
+            } catch (IOException e) {
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java 
b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
index 2d79970..b4ac42f 100644
--- 
a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
+++ 
b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
@@ -167,7 +167,7 @@ public class BaseTestDistributedScheduler extends 
HBaseMetadataTestCase {
     }
 
     boolean lock(ZookeeperDistributedJobLock jobLock, String cubeName, String 
serverName) {
-        return jobLock.lockWithName(cubeName, serverName);
+        return jobLock.lockPath(getLockPath(cubeName), serverName);
     }
 
     private static void initZk() {
@@ -197,6 +197,6 @@ public class BaseTestDistributedScheduler extends 
HBaseMetadataTestCase {
     }
 
     private String getLockPath(String pathName) {
-        return ZookeeperDistributedJobLock.ZOOKEEPER_LOCK_PATH + "/" + 
kylinConfig1.getMetadataUrlPrefix() + "/" + pathName;
+        return DistributedScheduler.ZOOKEEPER_LOCK_PATH + "/" + 
kylinConfig1.getMetadataUrlPrefix() + "/" + pathName;
     }
 }

Reply via email to