http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java deleted file mode 100644 index e2af338..0000000 --- a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java +++ /dev/null @@ -1,367 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.dict; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.BufferedReader; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Random; -import java.util.TreeMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.fs.Path; -import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.common.util.LocalFileMetadataTestCase; -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -/** - * Created by sunyerui on 16/4/28. - */ -public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase { - - public static final String BASE_DIR = "file:///tmp/kylin_append_dict"; - public static final String RESOURCE_DIR = "/dict/append_dict_test"; - - @Before - public void setUp() { - staticCreateTestMetadata(); - System.setProperty("kylin.dictionary.append-entry-size", "50000"); - System.setProperty("kylin.env.hdfs-working-dir", BASE_DIR); - } - - @After - public void after() { - cleanup(); - staticCleanupTestMetadata(); - } - - public static void cleanup() { - Path basePath = new Path(BASE_DIR); - try { - HadoopUtil.getFileSystem(basePath).delete(basePath, true); - } catch (IOException e) {} - } - - public static final String[] words = new String[] { "paint", "par", "part", "parts", "partition", "partitions", "party", "partie", "parties", "patient", "taste", "tar", "trie", "try", "tries", "åå ¸", "åå ¸æ ", "忝", // non-ascii characters - "", // empty - "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii", - "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiipaiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii", - "paintjkjdfklajkdljfkdsajklfjklsadjkjekjrklewjrklewjklrjklewjkljkljkljkljweklrjewkljrklewjrlkjewkljrkljkljkjlkjjkljkljkljkljlkjlkjlkjljdfadfads" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" - + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk", - "paint", "tar", "try", // some dup - }; - - @Test - public void testStringRepeatly() throws IOException { - ArrayList<String> list = new ArrayList<>(); - Collections.addAll(list, words); - ArrayList<String> notfound = new ArrayList<>(); - notfound.add("pa"); - notfound.add("pars"); - notfound.add("tri"); - notfound.add("å"); - for (int i = 0; i < 100; i++) { - testStringDictAppend(list, notfound, true); - } - } - - @Test - public void englishWordsTest() throws Exception { - InputStream is = new FileInputStream("src/test/resources/dict/english-words.80 (scowl-2015.05.18).txt"); - ArrayList<String> str = loadStrings(is); - testStringDictAppend(str, null, false); - } - - @Test - public void categoryNamesTest() throws Exception { - InputStream is = new FileInputStream("src/test/resources/dict/dw_category_grouping_names.dat"); - ArrayList<String> str = loadStrings(is); - testStringDictAppend(str, null, true); - } - - private static ArrayList<String> loadStrings(InputStream is) throws Exception { - ArrayList<String> r = new ArrayList<String>(); - BufferedReader reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); - try { - String word; - while ((word = reader.readLine()) != null) { - word = word.trim(); - if (word.isEmpty() == false) - r.add(word); - } - } finally { - reader.close(); - is.close(); - } - return r; - } - - @Ignore("need huge key set") - @Test - public void testHugeKeySet() throws IOException { - AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR); - AppendTrieDictionary<String> dict = null; - - InputStream is = new FileInputStream("src/test/resources/dict/huge_key"); - BufferedReader reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); - try { - String word; - while ((word = reader.readLine()) != null) { - word = word.trim(); - if (!word.isEmpty()) - b.addValue(word); - } - } finally { - reader.close(); - is.close(); - } - dict = b.build(0); - dict.dump(System.out); - } - - private static void testStringDictAppend(ArrayList<String> list, ArrayList<String> notfound, boolean shuffleList) throws IOException { - Random rnd = new Random(System.currentTimeMillis()); - ArrayList<String> strList = new ArrayList<String>(); - strList.addAll(list); - if (shuffleList) { - Collections.shuffle(strList, rnd); - } - BytesConverter converter = new StringBytesConverter(); - - AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR); - AppendTrieDictionary<String> dict = null; - TreeMap<Integer, String> checkMap = new TreeMap<>(); - int firstAppend = rnd.nextInt(strList.size() / 2); - int secondAppend = firstAppend + rnd.nextInt((strList.size() - firstAppend) / 2); - int appendIndex = 0; - int checkIndex = 0; - - for (; appendIndex < firstAppend; appendIndex++) { - b.addValue(strList.get(appendIndex)); - } - dict = b.build(0); - dict.dump(System.out); - for (; checkIndex < firstAppend; checkIndex++) { - String str = strList.get(checkIndex); - byte[] bytes = converter.convertToBytes(str); - int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, bytes.length, 0); - assertNotEquals(String.format("Value %s not exist", str), -1, id); - assertFalse(String.format("Id %d for %s should be empty, but is %s", id, str, checkMap.get(id)), checkMap.containsKey(id) && !str.equals(checkMap.get(id))); - checkMap.put(id, str); - } - - // reopen dict and append -// b = AppendTrieDictionary.Builder.create(dict); - b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR, dict); - for (; appendIndex < secondAppend; appendIndex++) { - b.addValue(strList.get(appendIndex)); - } - AppendTrieDictionary newDict = b.build(0); - assert newDict == dict; - dict = newDict; - dict.dump(System.out); - checkIndex = 0; - for (; checkIndex < secondAppend; checkIndex++) { - String str = strList.get(checkIndex); - byte[] bytes = converter.convertToBytes(str); - int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, bytes.length, 0); - assertNotEquals(String.format("Value %s not exist", str), -1, id); - if (checkIndex < firstAppend) { - assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id)); - } else { - // check second append str, should be new id - assertFalse(String.format("Id %d for %s should be empty, but is %s", id, str, checkMap.get(id)), checkMap.containsKey(id) && !str.equals(checkMap.get(id))); - checkMap.put(id, str); - } - } - - // reopen dict and append rest str - b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR, dict); - for (; appendIndex < strList.size(); appendIndex++) { - b.addValue(strList.get(appendIndex)); - } - newDict = b.build(0); - assert newDict == dict; - dict = newDict; - dict.dump(System.out); - checkIndex = 0; - for (; checkIndex < strList.size(); checkIndex++) { - String str = strList.get(checkIndex); - byte[] bytes = converter.convertToBytes(str); - int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, bytes.length, 0); - assertNotEquals(String.format("Value %s not exist", str), -1, id); - if (checkIndex < secondAppend) { - assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id)); - } else { - // check third append str, should be new id - assertFalse(String.format("Id %d for %s should be empty, but is %s", id, str, checkMap.get(id)), checkMap.containsKey(id) && !str.equals(checkMap.get(id))); - checkMap.put(id, str); - } - } - if (notfound != null) { - for (String s : notfound) { - byte[] bytes = converter.convertToBytes(s); - int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, bytes.length, 0); - assertEquals(-1, id); - } - } - - dict = testSerialize(dict, converter); - for (String str : strList) { - byte[] bytes = converter.convertToBytes(str); - int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, bytes.length, 0); - assertNotEquals(String.format("Value %s not exist", str), -1, id); - assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id)); - } - } - - private static AppendTrieDictionary<String> testSerialize(AppendTrieDictionary<String> dict, BytesConverter converter) { - try { - ByteArrayOutputStream bout = new ByteArrayOutputStream(); - DataOutputStream dataout = new DataOutputStream(bout); - dict.write(dataout); - dataout.close(); - ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray()); - DataInputStream datain = new DataInputStream(bin); - AppendTrieDictionary<String> r = new AppendTrieDictionary<String>(); - r.readFields(datain); - datain.close(); - return r; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Test - public void testMaxInteger() throws IOException { - AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR); - builder.setMaxId(Integer.MAX_VALUE - 2); - builder.addValue("a"); - builder.addValue("ab"); - builder.addValue("acd"); - builder.addValue("ac"); - AppendTrieDictionary dict = builder.build(0); - assertEquals(2147483646, dict.getIdFromValueImpl("a", 0)); - assertEquals(2147483647, dict.getIdFromValueImpl("ab", 0)); - assertEquals(-2147483647, dict.getIdFromValueImpl("ac", 0)); - assertEquals(-2147483648, dict.getIdFromValueImpl("acd", 0)); - } - - @Ignore("Only occurred when value is very long (>8000 bytes)") - @Test - public void testSuperLongValue() throws IOException { - AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR); - String value = "a"; - for (int i = 0; i < 10000; i++) { - value += "a"; - try { - builder.addValue(value); - } catch (StackOverflowError e) { - System.out.println("\nstack overflow " + i); - throw e; - } - } - AppendTrieDictionary dictionary = builder.build(0); - dictionary.getMaxId(); - } - - private static class SharedBuilderThread extends Thread { - CountDownLatch startLatch; - CountDownLatch finishLatch; - String resourcePath; - String prefix; - int count; - - SharedBuilderThread(CountDownLatch startLatch, CountDownLatch finishLatch, String resourcePath, String prefix, int count) { - this.startLatch = startLatch; - this.finishLatch = finishLatch; - this.resourcePath = resourcePath; - this.prefix = prefix; - this.count = count; - } - - @Override - public void run() { - try { - AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(resourcePath); - startLatch.countDown(); - for (int i = 0; i < count; i++) { - builder.addValue(prefix + i); - } - builder.build(0); - finishLatch.countDown(); - } catch (IOException e) {} - } - } - - @Test - public void testSharedBuilder() throws IOException, InterruptedException { - String resourcePath = "shared_builder"; - final CountDownLatch startLatch = new CountDownLatch(3); - final CountDownLatch finishLatch = new CountDownLatch(3); - - AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(resourcePath); - Thread t1 = new SharedBuilderThread(startLatch, finishLatch, resourcePath, "t1_", 10000); - Thread t2 = new SharedBuilderThread(startLatch, finishLatch, resourcePath, "t2_", 10); - Thread t3 = new SharedBuilderThread(startLatch, finishLatch, resourcePath, "t3_", 100000); - t1.start(); - t2.start(); - t3.start(); - startLatch.await(); - AppendTrieDictionary dict = builder.build(0); - assertTrue("AppendDictBuilder Thread too slow", finishLatch.await(3000, TimeUnit.MILLISECONDS)); - assertEquals(110010, dict.getMaxId()); - try { - builder.addValue("fail"); - fail("Builder should be closed"); - } catch (Exception e) {} - - builder = AppendTrieDictionary.Builder.getInstance(resourcePath, dict); - builder.addValue("success"); - dict = builder.build(0); - for (int i = 0; i < 10000; i ++) { - assertNotEquals(-1, dict.getIdFromValue("t1_" + i)); - } - for (int i = 0; i < 10; i ++) { - assertNotEquals(-1, dict.getIdFromValue("t2_" + i)); - } - for (int i = 0; i < 100000; i ++) { - assertNotEquals(-1, dict.getIdFromValue("t3_" + i)); - } - assertEquals(110011, dict.getIdFromValue("success")); - } -}
http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java deleted file mode 100644 index 3c29d9c..0000000 --- a/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java +++ /dev/null @@ -1,378 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.dict; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileFilter; -import java.io.IOException; -import java.util.UUID; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.kylin.common.util.HadoopUtil; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Test; - -/** - * Created by sunyerui on 16/7/12. - */ -public class CachedTreeMapTest { - - public static class Key implements WritableComparable { - int keyInt; - - public static Key of(int keyInt) { - Key newKey = new Key(); - newKey.keyInt = keyInt; - return newKey; - } - - @Override - public int compareTo(Object o) { - return keyInt - ((Key)o).keyInt; - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeInt(keyInt); - } - - @Override - public void readFields(DataInput in) throws IOException { - keyInt = in.readInt(); - } - - @Override - public String toString() { - return String.valueOf(keyInt); - } - } - - public static boolean VALUE_WRITE_ERROR_TOGGLE = false; - public static class Value implements Writable { - String valueStr; - - public static Value of(String valueStr) { - Value newValue = new Value(); - newValue.valueStr = valueStr; - return newValue; - } - - @Override - public void write(DataOutput out) throws IOException { - if (VALUE_WRITE_ERROR_TOGGLE) { - out.write(new byte[0]); - return; - } - out.writeUTF(valueStr); - } - - @Override - public void readFields(DataInput in) throws IOException { - valueStr = in.readUTF(); - } - } - - public static class CachedFileFilter implements FileFilter { - @Override - public boolean accept(File pathname) { - return pathname.getName().startsWith(CachedTreeMap.CACHED_PREFIX); - } - } - - public static class VersionFilter implements FileFilter { - @Override - public boolean accept(File pathname) { - return pathname.getName().startsWith(CachedTreeMap.VERSION_PREFIX); - } - } - - - static final UUID uuid = UUID.randomUUID(); - static final String baseDir = "/tmp/kylin_cachedtreemap_test/" + uuid; - static final String workingDir = baseDir + "/working"; - - private static void cleanup() { - Path basePath = new Path(baseDir); - try { - HadoopUtil.getFileSystem(basePath).delete(basePath, true); - } catch (IOException e) {} - VALUE_WRITE_ERROR_TOGGLE = false; - } - - @After - public void afterTest() { - cleanup(); - } - - @AfterClass - public static void tearDown() { - cleanup(); - } - - @Test - public void testCachedTreeMap() throws IOException { - CachedTreeMap map = createMutableMap(); - map.put(Key.of(1), Value.of("a")); - map.put(Key.of(2), Value.of("b")); - map.put(Key.of(3), Value.of("c")); - map.put(Key.of(4), Value.of("d")); - map.put(Key.of(5), Value.of("e")); - - File dir = new File(workingDir); - assertEquals(3, dir.listFiles(new CachedFileFilter()).length); - - flushAndCommit(map, true, true, false); - assertFalse(new File(workingDir).exists()); - - dir = new File(map.getLatestVersion()); - assertEquals(5, dir.listFiles(new CachedFileFilter()).length); - - CachedTreeMap map2 = createImmutableMap(); - assertEquals(5, map2.size()); - assertEquals("b", ((Value)map2.get(Key.of(2))).valueStr); - - try { - map2.put(Key.of(6), Value.of("f")); - fail("Should be error when put value into immutable map"); - } catch (AssertionError error) {} - } - - @Test - public void testMultiVersions() throws IOException, InterruptedException { - CachedTreeMap map = createMutableMap(); - Thread.sleep(3000); - map.put(Key.of(1), Value.of("a")); - map.put(Key.of(2), Value.of("b")); - map.put(Key.of(3), Value.of("c")); - flushAndCommit(map, true, true, false); - - CachedTreeMap map2 = createImmutableMap(); - assertEquals("b", ((Value)map2.get(Key.of(2))).valueStr); - - // re-open dict, append new data - map = createMutableMap(); - map.put(Key.of(4), Value.of("d")); - flushAndCommit(map, true, true, true); - - // new data is not visible for map2 - assertNull(map2.get(Key.of(4))); - - // append data, and be visible for new immutable map - map.put(Key.of(5), Value.of("e")); - flushAndCommit(map, true, true, true); - - CachedTreeMap map3 = createImmutableMap(); - assertEquals("d", ((Value)map3.get(Key.of(4))).valueStr); - assertEquals("e", ((Value)map3.get(Key.of(5))).valueStr); - - // Check versions retention - File dir = new File(baseDir); - assertEquals(3, dir.listFiles(new VersionFilter()).length); - } - - @Test - public void testKeepAppend() throws IOException { - CachedTreeMap map = createMutableMap(); - map.put(Key.of(1), Value.of("a")); - map.put(Key.of(2), Value.of("b")); - map.put(Key.of(3), Value.of("c")); - map.put(Key.of(4), Value.of("d")); - map.put(Key.of(5), Value.of("e")); - - // flush with keepAppend false, map can't be append - flushAndCommit(map, true, true, false); - // append into map has closed - try { - map.put(Key.of(6), Value.of("f")); - fail(); - } catch (AssertionError e) { - assertEquals("Only support put method with immutable false and keepAppend true", e.getMessage()); - } - - CachedTreeMap map2 = createImmutableMap(); - assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr); - assertEquals("d", ((Value)map2.get(Key.of(4))).valueStr); - assertEquals("e", ((Value)map2.get(Key.of(5))).valueStr); - - map = createMutableMap(); - map.put(Key.of(6), Value.of("f")); - map.put(Key.of(7), Value.of("g")); - map.put(Key.of(8), Value.of("h")); - // flush with keepAppend true - flushAndCommit(map, true, true, true); - map.put(Key.of(9), Value.of("i")); - // can still append data - flushAndCommit(map, true, true, false); - - map2 = createImmutableMap(); - assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr); - assertEquals("d", ((Value)map2.get(Key.of(4))).valueStr); - assertEquals("f", ((Value)map2.get(Key.of(6))).valueStr); - assertEquals("i", ((Value)map2.get(Key.of(9))).valueStr); - } - - @Test - public void testVersionRetention() throws IOException, InterruptedException { - File dir = new File(baseDir); - // TTL for 3s and keep 3 versions - CachedTreeMap map = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir) - .immutable(false).maxSize(2).keyClazz(Key.class).valueClazz(Value.class) - .maxVersions(3).versionTTL(1000 * 3).build(); - map.put(Key.of(1), Value.of("a")); - - // has version 0 when create map - assertEquals(1, dir.listFiles(new VersionFilter()).length); - Thread.sleep(2500); - - // flush version 1 - flushAndCommit(map, true, true, true); - assertEquals(2, dir.listFiles(new VersionFilter()).length); - - // flush version 2 - flushAndCommit(map, true, true, true); - assertEquals(3, dir.listFiles(new VersionFilter()).length); - - // flush version 3 - flushAndCommit(map, true, true, true); - // won't delete version since 3s TTL - assertEquals(4, dir.listFiles(new VersionFilter()).length); - - // sleep to make version 0 expired - Thread.sleep(500); - // flush verion 4 - flushAndCommit(map, true, true, false); - assertEquals(4, dir.listFiles(new VersionFilter()).length); - - // TTL for 100ms and keep 2 versions - map = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir) - .immutable(false).maxSize(2).keyClazz(Key.class).valueClazz(Value.class) - .maxVersions(2).versionTTL(100).build(); - flushAndCommit(map, true, true, false); - assertEquals(2, dir.listFiles(new VersionFilter()).length); - } - - @Test - public void testWithOldFormat() throws IOException { - File dir = new File(baseDir); - CachedTreeMap map = createMutableMap(); - map.put(Key.of(1), Value.of("a")); - map.put(Key.of(2), Value.of("b")); - map.put(Key.of(3), Value.of("c")); - map.put(Key.of(4), Value.of("d")); - map.put(Key.of(5), Value.of("e")); - flushAndCommit(map, true, true, true); - - // move version dir to base dir, to simulate the older format - Path versionPath = new Path(map.getLatestVersion()); - Path tmpVersionPath = new Path(versionPath.getParent().getParent(), versionPath.getName()); - FileSystem fs = HadoopUtil.getFileSystem(versionPath); - fs.rename(versionPath, tmpVersionPath); - fs.delete(new Path(baseDir), true); - fs.rename(tmpVersionPath, new Path(baseDir)); - assertEquals(0, dir.listFiles(new VersionFilter()).length); - assertEquals(5, dir.listFiles(new CachedFileFilter()).length); - - CachedTreeMap map2 = createImmutableMap(); - assertEquals(5, map2.size()); - assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr); - assertEquals("e", ((Value)map2.get(Key.of(5))).valueStr); - - assertEquals(1, dir.listFiles(new VersionFilter()).length); - assertEquals(0, dir.listFiles(new CachedFileFilter()).length); - } - - @Test - public void testWriteFailed() throws IOException { - // normal case - CachedTreeMap map = createMutableMap(); - map.put(Key.of(1), Value.of("a")); - map.put(Key.of(2), Value.of("b")); - map.put(Key.of(3), Value.of("c")); - map.remove(Key.of(3)); - map.put(Key.of(4), Value.of("d")); - - flushAndCommit(map, true, true, false); - - CachedTreeMap map2 = createImmutableMap(); - assertEquals(3, map2.size()); - assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr); - - // suppose write value failed and didn't commit data - map = createMutableMap(); - VALUE_WRITE_ERROR_TOGGLE = true; - map.put(Key.of(1), Value.of("aa")); - map.put(Key.of(2), Value.of("bb")); - VALUE_WRITE_ERROR_TOGGLE = false; - map.put(Key.of(3), Value.of("cc")); - map.put(Key.of(4), Value.of("dd")); - // suppose write value failed and didn't commit data - flushAndCommit(map, true, false, false); - - // read map data should not be modified - map2 = createImmutableMap(); - assertEquals(3, map2.size()); - assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr); - - assertTrue(new File(workingDir).exists()); - } - - private CachedTreeMap createImmutableMap() throws IOException { - CachedTreeMap map = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir) - .immutable(true).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build(); - try (DataInputStream in = map.openIndexInput()) { - map.readFields(in); - } - return map; - } - - private CachedTreeMap createMutableMap() throws IOException { - CachedTreeMap map = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir) - .immutable(false).maxSize(2).maxVersions(3).versionTTL(1000 * 3).keyClazz(Key.class).valueClazz(Value.class).build(); - try (DataInputStream in = map.openIndexInput()) { - map.readFields(in); - } catch (IOException e) {} - return map; - } - - private void flushAndCommit(CachedTreeMap map, boolean doFlush, boolean doCommit, boolean keepAppend) throws IOException { - if (doFlush) { - try (DataOutputStream out = map.openIndexOutput()) { - map.write(out); - } - } - - if (doCommit) { - map.commit(keepAppend); - } - } -} - http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/core-dictionary/src/test/java/org/apache/kylin/dict/global/AppendTrieDictionaryTest.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/global/AppendTrieDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/global/AppendTrieDictionaryTest.java new file mode 100644 index 0000000..f6ba75a --- /dev/null +++ b/core-dictionary/src/test/java/org/apache/kylin/dict/global/AppendTrieDictionaryTest.java @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.dict.global; + +import static org.apache.kylin.dict.global.GlobalDictHDFSStore.V2_INDEX_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.fail; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileFilter; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; +import java.util.UUID; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.dict.AppendTrieDictionary; +import org.apache.kylin.dict.BytesConverter; +import org.apache.kylin.dict.StringBytesConverter; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase { + private static final String RESOURCE_DIR = "/dict/append_dict_test/" + UUID.randomUUID(); + private static String BASE_DIR; + private static String LOCAL_BASE_DIR = "/tmp/kylin/kylin_metadata/resources/GlobalDict" + RESOURCE_DIR + "/"; + + @Before + public void beforeTest() { + staticCreateTestMetadata(); + KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "50000"); + BASE_DIR = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/GlobalDict" + RESOURCE_DIR + "/"; + } + + @After + public void afterTest() { + cleanup(); + staticCleanupTestMetadata(); + } + + private void cleanup() { + Path basePath = new Path(BASE_DIR); + try { + HadoopUtil.getFileSystem(basePath).delete(basePath, true); + } catch (IOException e) { + } + } + + private static final String[] words = new String[] { "paint", "par", "part", "parts", "partition", "partitions", "party", "partie", "parties", "patient", "taste", "tar", "trie", "try", "tries", "åå ¸", "åå ¸æ ", "忝", // non-ascii characters + "", // empty + "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii", "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiipaiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii", + "paintjkjdfklajkdljfkdsajklfjklsadjkjekjrklewjrklewjklrjklewjkljkljkljkljweklrjewkljrklewjrlkjewkljrkljkljkjlkjjkljkljkljkljlkjlkjlkjljdfadfads" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk", + "paint", "tar", "try", // some dup + }; + + private AppendTrieDictionaryBuilder createBuilder(String resourceDir) throws IOException { + int maxEntriesPerSlice = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize(); + return new AppendTrieDictionaryBuilder(resourceDir, maxEntriesPerSlice); + } + + @Test + public void testStringRepeatly() throws IOException { + ArrayList<String> list = new ArrayList<>(); + Collections.addAll(list, words); + ArrayList<String> notfound = new ArrayList<>(); + notfound.add("pa"); + notfound.add("pars"); + notfound.add("tri"); + notfound.add("å"); + for (int i = 0; i < 50; i++) { + testStringDictAppend(list, notfound, true); + //to speed up the test + cleanup(); + } + } + + @Test + public void testEnglishWords() throws Exception { + InputStream is = new FileInputStream("src/test/resources/dict/english-words.80 (scowl-2015.05.18).txt"); + ArrayList<String> str = loadStrings(is); + testStringDictAppend(str, null, false); + } + + @Test + public void testCategoryNames() throws Exception { + InputStream is = new FileInputStream("src/test/resources/dict/dw_category_grouping_names.dat"); + ArrayList<String> str = loadStrings(is); + testStringDictAppend(str, null, true); + } + + private static ArrayList<String> loadStrings(InputStream is) throws Exception { + ArrayList<String> r = new ArrayList<String>(); + BufferedReader reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); + try { + String word; + while ((word = reader.readLine()) != null) { + word = word.trim(); + if (word.isEmpty() == false) + r.add(word); + } + } finally { + reader.close(); + is.close(); + } + return r; + } + + @Ignore("need huge key set") + @Test + public void testHugeKeySet() throws IOException { + AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); + + AppendTrieDictionary<String> dict = null; + + InputStream is = new FileInputStream("src/test/resources/dict/huge_key"); + BufferedReader reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); + try { + String word; + while ((word = reader.readLine()) != null) { + word = word.trim(); + if (!word.isEmpty()) + builder.addValue(word); + } + } finally { + reader.close(); + is.close(); + } + dict = builder.build(0); + dict.dump(System.out); + } + + private void testStringDictAppend(ArrayList<String> list, ArrayList<String> notfound, boolean shuffleList) throws IOException { + Random rnd = new Random(System.currentTimeMillis()); + ArrayList<String> strList = new ArrayList<String>(); + strList.addAll(list); + if (shuffleList) { + Collections.shuffle(strList, rnd); + } + BytesConverter converter = new StringBytesConverter(); + + AppendTrieDictionaryBuilder b = createBuilder(RESOURCE_DIR); + + TreeMap<Integer, String> checkMap = new TreeMap<>(); + int firstAppend = rnd.nextInt(strList.size() / 2); + int secondAppend = firstAppend + rnd.nextInt((strList.size() - firstAppend) / 2); + int appendIndex = 0; + int checkIndex = 0; + + for (; appendIndex < firstAppend; appendIndex++) { + b.addValue(strList.get(appendIndex)); + } + AppendTrieDictionary<String> dict = b.build(0); + dict.dump(System.out); + for (; checkIndex < firstAppend; checkIndex++) { + String str = strList.get(checkIndex); + byte[] bytes = converter.convertToBytes(str); + int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, bytes.length, 0); + assertNotEquals(String.format("Value %s not exist", str), -1, id); + assertFalse(String.format("Id %d for %s should be empty, but is %s", id, str, checkMap.get(id)), checkMap.containsKey(id) && !str.equals(checkMap.get(id))); + checkMap.put(id, str); + } + + // reopen dict and append + b = createBuilder(RESOURCE_DIR); + + for (; appendIndex < secondAppend; appendIndex++) { + b.addValue(strList.get(appendIndex)); + } + AppendTrieDictionary<String> newDict = b.build(0); + assert newDict.equals(dict); + dict = newDict; + dict.dump(System.out); + checkIndex = 0; + for (; checkIndex < secondAppend; checkIndex++) { + String str = strList.get(checkIndex); + byte[] bytes = converter.convertToBytes(str); + int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, bytes.length, 0); + assertNotEquals(String.format("Value %s not exist", str), -1, id); + if (checkIndex < firstAppend) { + assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id)); + } else { + // check second append str, should be new id + assertFalse(String.format("Id %d for %s should be empty, but is %s", id, str, checkMap.get(id)), checkMap.containsKey(id) && !str.equals(checkMap.get(id))); + checkMap.put(id, str); + } + } + + // reopen dict and append rest str + b = createBuilder(RESOURCE_DIR); + + for (; appendIndex < strList.size(); appendIndex++) { + b.addValue(strList.get(appendIndex)); + } + newDict = b.build(0); + assert newDict.equals(dict); + dict = newDict; + dict.dump(System.out); + checkIndex = 0; + for (; checkIndex < strList.size(); checkIndex++) { + String str = strList.get(checkIndex); + byte[] bytes = converter.convertToBytes(str); + int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, bytes.length, 0); + assertNotEquals(String.format("Value %s not exist", str), -1, id); + if (checkIndex < secondAppend) { + assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id)); + } else { + // check third append str, should be new id + assertFalse(String.format("Id %d for %s should be empty, but is %s", id, str, checkMap.get(id)), checkMap.containsKey(id) && !str.equals(checkMap.get(id))); + checkMap.put(id, str); + } + } + if (notfound != null) { + for (String s : notfound) { + byte[] bytes = converter.convertToBytes(s); + int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, bytes.length, 0); + assertEquals(-1, id); + } + } + + dict = testSerialize(dict, converter); + for (String str : strList) { + byte[] bytes = converter.convertToBytes(str); + int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, bytes.length, 0); + assertNotEquals(String.format("Value %s not exist", str), -1, id); + assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id)); + } + } + + private static AppendTrieDictionary<String> testSerialize(AppendTrieDictionary<String> dict, BytesConverter converter) { + try { + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + DataOutputStream dataout = new DataOutputStream(bout); + dict.write(dataout); + dataout.close(); + ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray()); + DataInputStream datain = new DataInputStream(bin); + AppendTrieDictionary<String> r = new AppendTrieDictionary<String>(); + r.readFields(datain); + datain.close(); + return r; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testMaxInteger() throws IOException { + AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); + builder.setMaxId(Integer.MAX_VALUE - 2); + builder.addValue("a"); + builder.addValue("ab"); + builder.addValue("acd"); + builder.addValue("ac"); + AppendTrieDictionary dict = builder.build(0); + assertEquals(2147483646, dict.getIdFromValue("a", 0)); + assertEquals(2147483647, dict.getIdFromValue("ab", 0)); + assertEquals(-2147483647, dict.getIdFromValue("ac", 0)); + assertEquals(-2147483648, dict.getIdFromValue("acd", 0)); + } + + @Ignore("Only occurred when value is very long (>8000 bytes)") + @Test + public void testSuperLongValue() throws IOException { + AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); + String value = "a"; + for (int i = 0; i < 10000; i++) { + value += "a"; + try { + builder.addValue(value); + } catch (StackOverflowError e) { + System.out.println("\nstack overflow " + i); + throw e; + } + } + AppendTrieDictionary dictionary = builder.build(0); + dictionary.getMaxId(); + } + + @Test + public void testSplitContainSuperLongValue() throws IOException { + String superLongValue = "%5Cx1A%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%7E%29%5CxEF%5CxBF%5CxBD%5Cx1B+%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5Cx13%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5B"; + + createAppendTrieDict(Arrays.asList("a", superLongValue)); + } + + @Test + public void testSuperLongValueAsFileName() throws IOException { + String superLongValue = "%5Cx1A%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%7E%29%5CxEF%5CxBF%5CxBD%5Cx1B+%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5Cx13%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5B"; + + createAppendTrieDict(Arrays.asList("a", superLongValue)); + } + + @Test + public void testIllegalFileNameValue() throws IOException { + createAppendTrieDict(Arrays.asList("::", ":")); + } + + @Test + public void testSkipAddValue() throws IOException { + createAppendTrieDict(new ArrayList<String>()); + } + + private void createAppendTrieDict(List<String> valueList) throws IOException { + KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "1"); + + AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); + + for (String value : valueList) { + builder.addValue(value); + } + + builder.build(0); + } + + private static class CachedFileFilter implements FileFilter { + @Override + public boolean accept(File pathname) { + return pathname.getName().startsWith("cached_"); + } + } + + private static class VersionFilter implements FileFilter { + @Override + public boolean accept(File pathname) { + return pathname.getName().startsWith(GlobalDictHDFSStore.VERSION_PREFIX); + } + } + + @Test + public void testMultiVersions() throws IOException, InterruptedException { + KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4"); + + AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); + builder.addValue("a"); + builder.addValue("b"); + builder.addValue("c"); + builder.addValue("d"); + builder.addValue("e"); + builder.addValue("f"); + AppendTrieDictionary dict = builder.build(0); + + assertEquals(2, dict.getIdFromValue("b")); + + // re-open dict, append new data + builder = createBuilder(RESOURCE_DIR); + builder.addValue("g"); + + // new data is not visible + try { + dict.getIdFromValue("g"); + fail("Value 'g' (g) not exists!"); + } catch (IllegalArgumentException e) { + + } + + // append data, and be visible for new immutable map + builder.addValue("h"); + + AppendTrieDictionary newDict = builder.build(0); + assert newDict.equals(dict); + + assertEquals(7, newDict.getIdFromValue("g")); + assertEquals(8, newDict.getIdFromValue("h")); + + // Check versions retention + File dir = new File(LOCAL_BASE_DIR); + assertEquals(2, dir.listFiles(new VersionFilter()).length); + } + + @Test + public void testVersionRetention() throws IOException, InterruptedException { + KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4"); + KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-max-versions", "1"); + KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-version-ttl", "1000"); + + AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); + builder.addValue("a"); + + //version 1 + builder.build(0); + + // Check versions retention + File dir = new File(LOCAL_BASE_DIR); + assertEquals(1, dir.listFiles(new VersionFilter()).length); + + // sleep to make version 1 expired + Thread.sleep(1200); + + //version 2 + builder = createBuilder(RESOURCE_DIR); + builder.addValue(""); + builder.build(0); + + // Check versions retention + assertEquals(1, dir.listFiles(new VersionFilter()).length); + } + + @Test + public void testOldDirFormat() throws IOException { + KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4"); + + AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); + builder.addValue("a"); + builder.addValue("b"); + builder.addValue("c"); + builder.addValue("d"); + builder.addValue("e"); + builder.addValue("f"); + builder.build(0); + + convertDirToOldFormat(BASE_DIR); + + File dir = new File(LOCAL_BASE_DIR); + assertEquals(0, dir.listFiles(new VersionFilter()).length); + assertEquals(3, dir.listFiles(new CachedFileFilter()).length); + + //convert older format to new format when builder init + builder = createBuilder(RESOURCE_DIR); + builder.build(0); + + assertEquals(1, dir.listFiles(new VersionFilter()).length); + } + + private void convertDirToOldFormat(String baseDir) throws IOException { + Path basePath = new Path(baseDir); + FileSystem fs = HadoopUtil.getFileSystem(basePath); + + // move version dir to base dir, to simulate the older format + GlobalDictHDFSStore store = new GlobalDictHDFSStore(baseDir); + Long[] versions = store.listAllVersions(); + Path versionPath = store.getVersionDir(versions[versions.length - 1]); + Path tmpVersionPath = new Path(versionPath.getParent().getParent(), versionPath.getName()); + fs.rename(versionPath, tmpVersionPath); + fs.delete(new Path(baseDir), true); + fs.rename(tmpVersionPath, new Path(baseDir)); + } + + @Test + public void testOldIndexFormat() throws IOException { + KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4"); + + AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); + builder.addValue("a"); + builder.addValue("b"); + builder.addValue("c"); + builder.addValue("d"); + builder.addValue("e"); + builder.addValue("f"); + builder.build(0); + + convertIndexToOldFormat(BASE_DIR); + + builder = createBuilder(RESOURCE_DIR); + builder.addValue("g"); + builder.addValue("h"); + builder.addValue("i"); + AppendTrieDictionary dict = builder.build(0); + + assertEquals(1, dict.getIdFromValue("a")); + assertEquals(7, dict.getIdFromValue("g")); + } + + private void convertIndexToOldFormat(String baseDir) throws IOException { + Path basePath = new Path(baseDir); + FileSystem fs = HadoopUtil.getFileSystem(basePath); + + GlobalDictHDFSStore store = new GlobalDictHDFSStore(baseDir); + Long[] versions = store.listAllVersions(); + GlobalDictMetadata metadata = store.getMetadata(versions[versions.length - 1]); + + //convert v2 index to v1 index + Path versionPath = store.getVersionDir(versions[versions.length - 1]); + Path v2IndexFile = new Path(versionPath, V2_INDEX_NAME); + + fs.delete(v2IndexFile, true); + GlobalDictHDFSStore.IndexFormat indexFormatV1 = new GlobalDictHDFSStore.IndexFormatV1(fs, HadoopUtil.getCurrentConfiguration()); + indexFormatV1.writeIndexFile(versionPath, metadata); + + //convert v2 fileName format to v1 fileName format + for (Map.Entry<AppendDictSliceKey, String> entry : metadata.sliceFileMap.entrySet()) { + fs.rename(new Path(versionPath, entry.getValue()), new Path(versionPath, "cached_" + entry.getKey())); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java index 403abc4..8b6b5aa 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java @@ -187,8 +187,8 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti } @Override - public synchronized void init(JobEngineConfig jobEngineConfig, final JobLock jobLock) throws SchedulerException { - this.jobLock = jobLock; + public synchronized void init(JobEngineConfig jobEngineConfig, JobLock lock) throws SchedulerException { + jobLock = lock; String serverMode = jobEngineConfig.getConfig().getServerMode(); if (!("job".equals(serverMode.toLowerCase()) || "all".equals(serverMode.toLowerCase()))) { @@ -205,7 +205,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti this.jobEngineConfig = jobEngineConfig; - if (jobLock.lock() == false) { + if (jobLock.lockJobEngine() == false) { throw new IllegalStateException("Cannot start job scheduler due to lack of job lock"); } @@ -226,7 +226,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti @Override public void shutdown() throws SchedulerException { logger.info("Shutingdown Job Engine ...."); - jobLock.unlock(); + jobLock.unlockJobEngine(); fetcherPool.shutdown(); jobPool.shutdown(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java index 1f2e958..83f8964 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java @@ -18,6 +18,8 @@ package org.apache.kylin.job.impl.threadpool; +import java.io.Closeable; +import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Map; @@ -36,6 +38,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.lock.DistributedLock; import org.apache.kylin.common.util.SetThreadName; import org.apache.kylin.job.Scheduler; import org.apache.kylin.job.constant.ExecutableConstants; @@ -70,7 +73,8 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn private ExecutorService watchPool; private ExecutorService jobPool; private DefaultContext context; - private DistributedJobLock jobLock; + private DistributedLock jobLock; + private Closeable lockWatch; private static final Logger logger = LoggerFactory.getLogger(DistributedScheduler.class); private static final ConcurrentMap<KylinConfig, DistributedScheduler> CACHE = new ConcurrentHashMap<KylinConfig, DistributedScheduler>(); @@ -81,6 +85,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn private JobEngineConfig jobEngineConfig; private final static String SEGMENT_ID = "segmentId"; + public static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock"; //only for it test public static DistributedScheduler getInstance(KylinConfig config) { @@ -149,7 +154,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn private String serverName = getServerName(); - private String getServerName() { + public String getServerName() { String serverName = null; try { serverName = InetAddress.getLocalHost().getHostName(); @@ -177,7 +182,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn public void run() { try (SetThreadName ignored = new SetThreadName("Job %s", executable.getId())) { String segmentId = executable.getParam(SEGMENT_ID); - if (jobLock.lockWithName(segmentId, serverName)) { + if (jobLock.lockPath(getLockPath(segmentId), serverName)) { logger.info(executable.toString() + " scheduled in server: " + serverName); context.addRunningJob(executable); @@ -205,7 +210,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn if (state != ExecutableState.READY && state != ExecutableState.RUNNING) { if (segmentWithLocks.contains(segmentId)) { logger.info(executable.toString() + " will release the lock for the segment: " + segmentId); - jobLock.unlockWithName(segmentId); + jobLock.unlockPath(getLockPath(segmentId)); segmentWithLocks.remove(segmentId); } } @@ -214,15 +219,15 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn } //when the segment lock released but the segment related job still running, resume the job. - private class DoWatchImpl implements org.apache.kylin.job.lock.DistributedJobLock.DoWatchLock { + private class WatcherProcessImpl implements org.apache.kylin.common.lock.DistributedLock.Watcher { private String serverName; - public DoWatchImpl(String serverName) { + public WatcherProcessImpl(String serverName) { this.serverName = serverName; } @Override - public void doWatch(String path, String nodeData) { + public void process(String path, String nodeData) { String[] paths = path.split("/"); String segmentId = paths[paths.length - 1]; @@ -233,7 +238,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn if (executable instanceof DefaultChainedExecutable && executable.getParams().get(SEGMENT_ID).equalsIgnoreCase(segmentId) && !nodeData.equalsIgnoreCase(serverName)) { try { logger.warn(nodeData + " has released the lock for: " + segmentId + " but the job still running. so " + serverName + " resume the job"); - if (!jobLock.isHasLocked(segmentId)) { + if (!jobLock.isPathLocked(getLockPath(segmentId))) { executableManager.resumeRunningJobForce(executable.getId()); fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS); break; @@ -260,7 +265,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn } @Override - public synchronized void init(JobEngineConfig jobEngineConfig, final JobLock jobLock) throws SchedulerException { + public synchronized void init(JobEngineConfig jobEngineConfig, JobLock jobLock) throws SchedulerException { String serverMode = jobEngineConfig.getConfig().getServerMode(); if (!("job".equals(serverMode.toLowerCase()) || "all".equals(serverMode.toLowerCase()))) { logger.info("server mode: " + serverMode + ", no need to run job scheduler"); @@ -283,18 +288,18 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn //watch the zookeeper node change, so that when one job server is down, other job servers can take over. watchPool = Executors.newFixedThreadPool(1); - DoWatchImpl doWatchImpl = new DoWatchImpl(this.serverName); - this.jobLock.watchLock(watchPool, doWatchImpl); + WatcherProcessImpl watcherProcess = new WatcherProcessImpl(this.serverName); + lockWatch = this.jobLock.watchPath(getWatchPath(), watchPool, watcherProcess); int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit(); jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue<Runnable>()); context = new DefaultContext(Maps.<String, Executable> newConcurrentMap(), jobEngineConfig.getConfig()); - resumeAllRunningJobs(); - fetcher = new FetcherRunner(); fetcherPool.scheduleAtFixedRate(fetcher, 10, ExecutableConstants.DEFAULT_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS); hasStarted = true; + + resumeAllRunningJobs(); } private void resumeAllRunningJobs() { @@ -303,7 +308,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn AbstractExecutable executable = executableManager.getJob(id); if (output.getState() == ExecutableState.RUNNING && executable instanceof DefaultChainedExecutable) { try { - if (!jobLock.isHasLocked(executable.getParam(SEGMENT_ID))) { + if (!jobLock.isPathLocked(executable.getParam(SEGMENT_ID))) { executableManager.resumeRunningJobForce(executable.getId()); fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS); } @@ -314,16 +319,27 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn } } + public String getLockPath(String pathName) { + return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName; + } + + private String getWatchPath() { + return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix(); + } + @Override public void shutdown() throws SchedulerException { logger.info("Will shut down Job Engine ...."); + try { + lockWatch.close(); + } catch (IOException e) { + throw new SchedulerException(e); + } + releaseAllLocks(); logger.info("The all locks has released"); - watchPool.shutdown(); - logger.info("The watchPool has down"); - fetcherPool.shutdown(); logger.info("The fetcherPool has down"); @@ -333,7 +349,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn private void releaseAllLocks() { for (String segmentId : segmentWithLocks) { - jobLock.unlockWithName(segmentId); + jobLock.unlockPath(getLockPath(segmentId)); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java index 1c173ec..e5e2a1e 100644 --- a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java +++ b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java @@ -18,19 +18,7 @@ package org.apache.kylin.job.lock; -import java.util.concurrent.ExecutorService; +import org.apache.kylin.common.lock.DistributedLock; -public interface DistributedJobLock extends JobLock { - - boolean lockWithName(String name, String serverName); - - boolean isHasLocked(String segmentId); - - void unlockWithName(String name); - - void watchLock(ExecutorService pool, DoWatchLock doWatch); - - public interface DoWatchLock { - void doWatch(String path, String data); - } +public interface DistributedJobLock extends JobLock, DistributedLock { } http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java index bbfb801..1b6b29e 100644 --- a/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java +++ b/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java @@ -19,9 +19,12 @@ package org.apache.kylin.job.lock; /** + * Among a Kylin cluster, usually only one node runs as the job engine and does the scheduling of build jobs. + * This interface is for such negotiation. */ public interface JobLock { - boolean lock(); + + boolean lockJobEngine(); - void unlock(); + void unlockJobEngine(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java index cac17b9..73f6192 100644 --- a/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java +++ b/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java @@ -22,12 +22,12 @@ package org.apache.kylin.job.lock; */ public class MockJobLock implements JobLock { @Override - public boolean lock() { + public boolean lockJobEngine() { return true; } @Override - public void unlock() { + public void unlockJobEngine() { return; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java index daa1053..08a8cb0 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java @@ -156,9 +156,6 @@ public class KylinKryoRegistrator implements KryoRegistrator { kyroClasses.add(org.apache.kylin.cube.model.SelectRule.class); kyroClasses.add(org.apache.kylin.cube.model.v1_4_0.CubeDesc.class); kyroClasses.add(org.apache.kylin.dict.AppendTrieDictionary.class); - kyroClasses.add(org.apache.kylin.dict.AppendTrieDictionary.DictNode.class); - kyroClasses.add(org.apache.kylin.dict.AppendTrieDictionary.DictSlice.class); - kyroClasses.add(org.apache.kylin.dict.AppendTrieDictionary.DictSliceKey.class); kyroClasses.add(org.apache.kylin.dict.CacheDictionary.class); kyroClasses.add(org.apache.kylin.dict.DateStrDictionary.class); kyroClasses.add(org.apache.kylin.dict.DictionaryInfo.class); http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/examples/test_case_data/localmeta/kylin.properties ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/kylin.properties b/examples/test_case_data/localmeta/kylin.properties index 9f7b24c..969e466 100644 --- a/examples/test_case_data/localmeta/kylin.properties +++ b/examples/test_case_data/localmeta/kylin.properties @@ -136,4 +136,4 @@ kylin.engine.mr.config-override.test2=test2 kylin.job.lock=org.apache.kylin.job.lock.MockJobLock kylin.engine.provider.0=org.apache.kylin.engine.mr.MRBatchCubingEngine -kylin.engine.provider.2=org.apache.kylin.engine.mr.MRBatchCubingEngine2 +kylin.engine.provider.2=org.apache.kylin.engine.mr.MRBatchCubingEngine2 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java new file mode 100644 index 0000000..df2ebf7 --- /dev/null +++ b/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.dict; + +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.common.util.HBaseMetadataTestCase; +import org.apache.kylin.common.util.HadoopUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +public class ITGlobalDictionaryBuilderTest extends HBaseMetadataTestCase { + private DictionaryInfo dictionaryInfo; + + @Before + public void beforeTest() throws Exception { + staticCreateTestMetadata(); + dictionaryInfo = new DictionaryInfo("testTable", "testColumn", 0, "String", null); + } + + @After + public void afterTest() { + cleanup(); + staticCleanupTestMetadata(); + } + + private void cleanup() { + String BASE_DIR = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/GlobalDict" + dictionaryInfo.getResourceDir() + "/"; + Path basePath = new Path(BASE_DIR); + try { + HadoopUtil.getFileSystem(basePath).delete(basePath, true); + } catch (IOException e) { + } + } + + @Test + public void testGlobalDictLock() throws IOException, InterruptedException { + final CountDownLatch startLatch = new CountDownLatch(3); + final CountDownLatch finishLatch = new CountDownLatch(3); + + Thread t1 = new SharedBuilderThread(startLatch, finishLatch, "t1_", 10000); + Thread t2 = new SharedBuilderThread(startLatch, finishLatch, "t2_", 10); + Thread t3 = new SharedBuilderThread(startLatch, finishLatch, "t3_", 100000); + t1.start(); + t2.start(); + t3.start(); + startLatch.await(); + finishLatch.await(); + + GlobalDictionaryBuilder builder = new GlobalDictionaryBuilder(); + builder.init(dictionaryInfo, 0); + builder.addValue("success"); + Dictionary<String> dict = builder.build(); + + for (int i = 0; i < 10000; i++) { + assertNotEquals(-1, dict.getIdFromValue("t1_" + i)); + } + for (int i = 0; i < 10; i++) { + assertNotEquals(-1, dict.getIdFromValue("t2_" + i)); + } + for (int i = 0; i < 100000; i++) { + assertNotEquals(-1, dict.getIdFromValue("t3_" + i)); + } + + assertEquals(110011, dict.getIdFromValue("success")); + } + + private class SharedBuilderThread extends Thread { + CountDownLatch startLatch; + CountDownLatch finishLatch; + String prefix; + int count; + + SharedBuilderThread(CountDownLatch startLatch, CountDownLatch finishLatch, String prefix, int count) { + this.startLatch = startLatch; + this.finishLatch = finishLatch; + this.prefix = prefix; + this.count = count; + } + + @Override + public void run() { + try { + GlobalDictionaryBuilder builder = new GlobalDictionaryBuilder(); + startLatch.countDown(); + + builder.init(dictionaryInfo, 0); + for (int i = 0; i < count; i++) { + builder.addValue(prefix + i); + } + builder.build(); + finishLatch.countDown(); + } catch (IOException e) { + } + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java index 2d79970..b4ac42f 100644 --- a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java +++ b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java @@ -167,7 +167,7 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase { } boolean lock(ZookeeperDistributedJobLock jobLock, String cubeName, String serverName) { - return jobLock.lockWithName(cubeName, serverName); + return jobLock.lockPath(getLockPath(cubeName), serverName); } private static void initZk() { @@ -197,6 +197,6 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase { } private String getLockPath(String pathName) { - return ZookeeperDistributedJobLock.ZOOKEEPER_LOCK_PATH + "/" + kylinConfig1.getMetadataUrlPrefix() + "/" + pathName; + return DistributedScheduler.ZOOKEEPER_LOCK_PATH + "/" + kylinConfig1.getMetadataUrlPrefix() + "/" + pathName; } }