KYLIN-2945 global dict specific info change to use relative path
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1c88db54 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1c88db54 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1c88db54 Branch: refs/heads/2.2.x Commit: 1c88db542a6f2bb4282282510ffdbe3cca728d98 Parents: 6f54dac Author: Yifei Wu <yifei.wu@kyligencedeMacBook-Pro.local> Authored: Wed Oct 18 16:15:37 2017 +0800 Committer: Yifei Wu <yifei.wu@Yifei.local> Committed: Fri Oct 20 10:11:38 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 5 +- .../org/apache/kylin/common/StorageURL.java | 2 +- .../common/persistence/FileResourceStore.java | 6 +- .../apache/kylin/dict/AppendTrieDictionary.java | 53 +- .../global/AppendTrieDictionaryBuilder.java | 2 +- .../kylin/dict/global/GlobalDictHDFSStore.java | 6 +- .../kylin/dict/AppendTrieDictionaryTest.java | 577 +++++++++++++++++++ .../dict/global/AppendTrieDictionaryTest.java | 525 ----------------- .../engine/mr/common/AbstractHadoopJob.java | 9 +- .../spark/SparkBatchCubingJobBuilder2.java | 9 +- .../kylin/storage/hdfs/HDFSResourceStore.java | 2 +- .../hdfs/IdentifierFileResourceStore.java | 51 ++ 12 files changed, 704 insertions(+), 543 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/1c88db54/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index ee05d69..56c7b1f 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -29,12 +29,12 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.kylin.common.lock.DistributedLockFactory; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.CliCommandExecutor; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.ZooKeeperUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -205,7 +205,7 @@ abstract public class KylinConfigBase implements Serializable { // make sure path is qualified try { - FileSystem fs = path.getFileSystem(new Configuration()); + FileSystem fs = path.getFileSystem(HadoopUtil.getCurrentConfiguration()); path = fs.makeQualified(path); } catch (IOException e) { throw new RuntimeException(e); @@ -272,6 +272,7 @@ abstract public class KylinConfigBase implements Serializable { r.put("", "org.apache.kylin.common.persistence.FileResourceStore"); r.put("hbase", "org.apache.kylin.storage.hbase.HBaseResourceStore"); r.put("hdfs", "org.apache.kylin.storage.hdfs.HDFSResourceStore"); + r.put("ifile", "org.apache.kylin.storage.hdfs.IdentifierFileResourceStore"); r.putAll(getPropertiesByPrefix("kylin.metadata.resource-store-provider.")); // note the naming convention -- http://kylin.apache.org/development/coding_naming_convention.html return r; } http://git-wip-us.apache.org/repos/asf/kylin/blob/1c88db54/core-common/src/main/java/org/apache/kylin/common/StorageURL.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/StorageURL.java b/core-common/src/main/java/org/apache/kylin/common/StorageURL.java index cebbc27..2d3e04c 100644 --- a/core-common/src/main/java/org/apache/kylin/common/StorageURL.java +++ b/core-common/src/main/java/org/apache/kylin/common/StorageURL.java @@ -99,7 +99,7 @@ public class StorageURL { this.params = ImmutableMap.copyOf(m); } - private StorageURL(String identifier, String scheme, Map<String, String> params) { + public StorageURL(String identifier, String scheme, Map<String, String> params) { this.identifier = identifier; this.scheme = scheme; this.params = ImmutableMap.copyOf(params); http://git-wip-us.apache.org/repos/asf/kylin/blob/1c88db54/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java index c32556a..12c8aba 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java @@ -45,12 +45,16 @@ public class FileResourceStore extends ResourceStore { public FileResourceStore(KylinConfig kylinConfig) { super(kylinConfig); - root = new File(kylinConfig.getMetadataUrl().getIdentifier()).getAbsoluteFile(); + root = new File(getPath(kylinConfig)).getAbsoluteFile(); if (root.exists() == false) throw new IllegalArgumentException( "File not exist by '" + kylinConfig.getMetadataUrl() + "': " + root.getAbsolutePath()); } + protected String getPath(KylinConfig kylinConfig) { + return kylinConfig.getMetadataUrl().getIdentifier(); + } + @Override protected NavigableSet<String> listResourcesImpl(String folderPath) throws IOException { synchronized (FileResourceStore.class) { http://git-wip-us.apache.org/repos/asf/kylin/blob/1c88db54/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java index 1855274..9e68eb4 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java @@ -60,19 +60,20 @@ import com.google.common.cache.RemovalNotification; * * @author sunyerui */ -@SuppressWarnings({ "rawtypes", "unchecked", "serial" }) +@SuppressWarnings({"rawtypes", "unchecked", "serial"}) public class AppendTrieDictionary<T> extends CacheDictionary<T> { - public static final byte[] HEAD_MAGIC = new byte[] { 0x41, 0x70, 0x70, 0x65, 0x63, 0x64, 0x54, 0x72, 0x69, 0x65, 0x44, 0x69, 0x63, 0x74 }; // "AppendTrieDict" + public static final byte[] HEAD_MAGIC = new byte[]{0x41, 0x70, 0x70, 0x65, 0x63, 0x64, 0x54, 0x72, 0x69, 0x65, 0x44, 0x69, 0x63, 0x74}; // "AppendTrieDict" public static final int HEAD_SIZE_I = HEAD_MAGIC.length; private static final Logger logger = LoggerFactory.getLogger(AppendTrieDictionary.class); + transient private Boolean isSaveAbsolutePath = false; transient private String baseDir; transient private GlobalDictMetadata metadata; transient private LoadingCache<AppendDictSliceKey, AppendDictSlice> dictCache; public void init(String baseDir) throws IOException { - this.baseDir = baseDir; - final GlobalDictStore globalDictStore = new GlobalDictHDFSStore(baseDir); + this.baseDir = convertToAbsolutePath(baseDir); + final GlobalDictStore globalDictStore = new GlobalDictHDFSStore(this.baseDir); Long[] versions = globalDictStore.listAllVersions(); if (versions.length == 0) { @@ -151,7 +152,7 @@ public class AppendTrieDictionary<T> extends CacheDictionary<T> { @Override public void write(DataOutput out) throws IOException { - out.writeUTF(baseDir); + out.writeUTF(convertToRelativePath(baseDir)); } @Override @@ -190,4 +191,46 @@ public class AppendTrieDictionary<T> extends CacheDictionary<T> { public boolean contains(Dictionary other) { return false; } + + /** + * JIRA: https://issues.apache.org/jira/browse/KYLIN-2945 + * if pass a absolute path, it may produce some problems like cannot find global dict after migration. + * so convert to relative path can avoid it and be better to maintain flexibility. + * + */ + private String convertToRelativePath(String path) { + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + String hdfsWorkingDir = kylinConfig.getHdfsWorkingDirectory(); + if (!isSaveAbsolutePath && path.startsWith(hdfsWorkingDir)) { + return path.substring(hdfsWorkingDir.length()); + } + return path; + } + + private String convertToAbsolutePath(String path) { + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + Path basicPath = new Path(path); + if (basicPath.toUri().getScheme() == null) + return kylinConfig.getHdfsWorkingDirectory() + path; + + String[] paths = path.split("/resources/GlobalDict/"); + if (paths.length == 2) + return kylinConfig.getHdfsWorkingDirectory() + "/resources/GlobalDict/" + paths[1]; + + paths = path.split("/resources/SegmentDict/"); + if (paths.length == 2) { + return kylinConfig.getHdfsWorkingDirectory() + "/resources/SegmentDict/" + paths[1]; + } else { + throw new RuntimeException("the basic directory of global dictionary only support the format which contains '/resources/GlobalDict/' or '/resources/SegmentDict/'"); + } + } + + /** + * only for test + * + * @param flag + */ + void setSaveAbsolutePath(Boolean flag) { + this.isSaveAbsolutePath = flag; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/1c88db54/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendTrieDictionaryBuilder.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendTrieDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendTrieDictionaryBuilder.java index 54978c2..a961527 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendTrieDictionaryBuilder.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendTrieDictionaryBuilder.java @@ -278,7 +278,7 @@ public class AppendTrieDictionaryBuilder { } // Only used for test - void setMaxId(int id) { + public void setMaxId(int id) { this.maxId = id; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/1c88db54/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictHDFSStore.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictHDFSStore.java b/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictHDFSStore.java index bad427c..ec79f2c 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictHDFSStore.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictHDFSStore.java @@ -46,8 +46,8 @@ public class GlobalDictHDFSStore extends GlobalDictStore { static final Logger logger = LoggerFactory.getLogger(GlobalDictHDFSStore.class); static final String V1_INDEX_NAME = ".index"; - static final String V2_INDEX_NAME = ".index_v2"; - static final String VERSION_PREFIX = "version_"; + public static final String V2_INDEX_NAME = ".index_v2"; + public static final String VERSION_PREFIX = "version_"; static final int BUFFER_SIZE = 8 * 1024 * 1024; private final Path basePath; @@ -295,7 +295,7 @@ public class GlobalDictHDFSStore extends GlobalDictStore { protected final FileSystem fs; protected final Configuration conf; - protected IndexFormatV1(FileSystem fs, Configuration conf) { + public IndexFormatV1(FileSystem fs, Configuration conf) { this.fs = fs; this.conf = conf; } http://git-wip-us.apache.org/repos/asf/kylin/blob/1c88db54/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java new file mode 100644 index 0000000..36ca66e --- /dev/null +++ b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java @@ -0,0 +1,577 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.dict; + +import static org.apache.kylin.dict.global.GlobalDictHDFSStore.V2_INDEX_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileFilter; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; +import java.util.UUID; + +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.dict.global.AppendDictSliceKey; +import org.apache.kylin.dict.global.AppendTrieDictionaryBuilder; +import org.apache.kylin.dict.global.GlobalDictHDFSStore; +import org.apache.kylin.dict.global.GlobalDictMetadata; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase { + private static final String RESOURCE_DIR = "/dict/append_dict_test/" + UUID.randomUUID(); + private static String BASE_DIR; + private static String LOCAL_BASE_DIR; + + @Before + public void beforeTest() { + staticCreateTestMetadata(); + KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "50000"); + BASE_DIR = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/GlobalDict" + RESOURCE_DIR + "/"; + LOCAL_BASE_DIR = getLocalWorkingDirectory() + "/resources/GlobalDict" + RESOURCE_DIR + "/"; + } + + @After + public void afterTest() { + cleanup(); + staticCleanupTestMetadata(); + } + + private void cleanup() { + Path basePath = new Path(BASE_DIR); + try { + HadoopUtil.getFileSystem(basePath).delete(basePath, true); + } catch (IOException e) { + } + } + + private static final String[] words = new String[]{"paint", "par", "part", "parts", "partition", "partitions", "party", "partie", "parties", "patient", "taste", "tar", "trie", "try", "tries", "åå ¸", "åå ¸æ ", "忝", // non-ascii characters + "", // empty + "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii", "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiipaiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii", + "paintjkjdfklajkdljfkdsajklfjklsadjkjekjrklewjrklewjklrjklewjkljkljkljkljweklrjewkljrklewjrlkjewkljrkljkljkjlkjjkljkljkljkljlkjlkjlkjljdfadfads" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk", + "paint", "tar", "try", // some dup + }; + + private AppendTrieDictionaryBuilder createBuilder() throws IOException { + int maxEntriesPerSlice = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize(); + return new AppendTrieDictionaryBuilder(BASE_DIR, maxEntriesPerSlice, true); + } + + @Test + public void testStringRepeatly() throws IOException { + ArrayList<String> list = new ArrayList<>(); + Collections.addAll(list, words); + ArrayList<String> notfound = new ArrayList<>(); + notfound.add("pa"); + notfound.add("pars"); + notfound.add("tri"); + notfound.add("å"); + for (int i = 0; i < 50; i++) { + testStringDictAppend(list, notfound, true); + //to speed up the test + cleanup(); + } + } + + @Test + public void testEnglishWords() throws Exception { + InputStream is = new FileInputStream("src/test/resources/dict/english-words.80 (scowl-2015.05.18).txt"); + ArrayList<String> str = loadStrings(is); + testStringDictAppend(str, null, false); + } + + @Test + public void testCategoryNames() throws Exception { + InputStream is = new FileInputStream("src/test/resources/dict/dw_category_grouping_names.dat"); + ArrayList<String> str = loadStrings(is); + testStringDictAppend(str, null, true); + } + + private static ArrayList<String> loadStrings(InputStream is) throws Exception { + ArrayList<String> r = new ArrayList<String>(); + BufferedReader reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); + try { + String word; + while ((word = reader.readLine()) != null) { + word = word.trim(); + if (word.isEmpty() == false) + r.add(word); + } + } finally { + reader.close(); + is.close(); + } + return r; + } + + @Ignore("need huge key set") + @Test + public void testHugeKeySet() throws IOException { + AppendTrieDictionaryBuilder builder = createBuilder(); + + AppendTrieDictionary<String> dict = null; + + InputStream is = new FileInputStream("src/test/resources/dict/huge_key"); + BufferedReader reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); + try { + String word; + while ((word = reader.readLine()) != null) { + word = word.trim(); + if (!word.isEmpty()) + builder.addValue(word); + } + } finally { + reader.close(); + is.close(); + } + dict = builder.build(0); + dict.dump(System.out); + } + + private void testStringDictAppend(ArrayList<String> list, ArrayList<String> notfound, boolean shuffleList) throws IOException { + Random rnd = new Random(System.currentTimeMillis()); + ArrayList<String> strList = new ArrayList<String>(); + strList.addAll(list); + if (shuffleList) { + Collections.shuffle(strList, rnd); + } + BytesConverter converter = new StringBytesConverter(); + + AppendTrieDictionaryBuilder b = createBuilder(); + + TreeMap<Integer, String> checkMap = new TreeMap<>(); + int firstAppend = rnd.nextInt(strList.size() / 2); + int secondAppend = firstAppend + rnd.nextInt((strList.size() - firstAppend) / 2); + int appendIndex = 0; + int checkIndex = 0; + + for (; appendIndex < firstAppend; appendIndex++) { + b.addValue(strList.get(appendIndex)); + } + AppendTrieDictionary<String> dict = b.build(0); + dict.dump(System.out); + for (; checkIndex < firstAppend; checkIndex++) { + String str = strList.get(checkIndex); + byte[] bytes = converter.convertToBytes(str); + int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, bytes.length, 0); + assertNotEquals(String.format("Value %s not exist", str), -1, id); + assertFalse(String.format("Id %d for %s should be empty, but is %s", id, str, checkMap.get(id)), checkMap.containsKey(id) && !str.equals(checkMap.get(id))); + checkMap.put(id, str); + } + + // reopen dict and append + b = createBuilder(); + + for (; appendIndex < secondAppend; appendIndex++) { + b.addValue(strList.get(appendIndex)); + } + AppendTrieDictionary<String> newDict = b.build(0); + assert newDict.equals(dict); + dict = newDict; + dict.dump(System.out); + checkIndex = 0; + for (; checkIndex < secondAppend; checkIndex++) { + String str = strList.get(checkIndex); + byte[] bytes = converter.convertToBytes(str); + int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, bytes.length, 0); + assertNotEquals(String.format("Value %s not exist", str), -1, id); + if (checkIndex < firstAppend) { + assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id)); + } else { + // check second append str, should be new id + assertFalse(String.format("Id %d for %s should be empty, but is %s", id, str, checkMap.get(id)), checkMap.containsKey(id) && !str.equals(checkMap.get(id))); + checkMap.put(id, str); + } + } + + // reopen dict and append rest str + b = createBuilder(); + + for (; appendIndex < strList.size(); appendIndex++) { + b.addValue(strList.get(appendIndex)); + } + newDict = b.build(0); + assert newDict.equals(dict); + dict = newDict; + dict.dump(System.out); + checkIndex = 0; + for (; checkIndex < strList.size(); checkIndex++) { + String str = strList.get(checkIndex); + byte[] bytes = converter.convertToBytes(str); + int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, bytes.length, 0); + assertNotEquals(String.format("Value %s not exist", str), -1, id); + if (checkIndex < secondAppend) { + assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id)); + } else { + // check third append str, should be new id + assertFalse(String.format("Id %d for %s should be empty, but is %s", id, str, checkMap.get(id)), checkMap.containsKey(id) && !str.equals(checkMap.get(id))); + checkMap.put(id, str); + } + } + if (notfound != null) { + for (String s : notfound) { + byte[] bytes = converter.convertToBytes(s); + int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, bytes.length, 0); + assertEquals(-1, id); + } + } + + dict = testSerialize(dict, converter); + for (String str : strList) { + byte[] bytes = converter.convertToBytes(str); + int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, bytes.length, 0); + assertNotEquals(String.format("Value %s not exist", str), -1, id); + assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id)); + } + } + + private static AppendTrieDictionary<String> testSerialize(AppendTrieDictionary<String> dict, BytesConverter converter) { + try { + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + DataOutputStream dataout = new DataOutputStream(bout); + dict.write(dataout); + dataout.close(); + ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray()); + DataInputStream datain = new DataInputStream(bin); + AppendTrieDictionary<String> r = new AppendTrieDictionary<String>(); + r.readFields(datain); + datain.close(); + return r; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testMaxInteger() throws IOException { + AppendTrieDictionaryBuilder builder = createBuilder(); + builder.setMaxId(Integer.MAX_VALUE - 2); + builder.addValue("a"); + builder.addValue("ab"); + builder.addValue("acd"); + builder.addValue("ac"); + AppendTrieDictionary dict = builder.build(0); + assertEquals(2147483646, dict.getIdFromValue("a", 0)); + assertEquals(2147483647, dict.getIdFromValue("ab", 0)); + assertEquals(-2147483647, dict.getIdFromValue("ac", 0)); + assertEquals(-2147483648, dict.getIdFromValue("acd", 0)); + } + + @Ignore("Only occurred when value is very long (>8000 bytes)") + @Test + public void testSuperLongValue() throws IOException { + AppendTrieDictionaryBuilder builder = createBuilder(); + String value = "a"; + for (int i = 0; i < 10000; i++) { + value += "a"; + try { + builder.addValue(value); + } catch (StackOverflowError e) { + System.out.println("\nstack overflow " + i); + throw e; + } + } + AppendTrieDictionary dictionary = builder.build(0); + dictionary.getMaxId(); + } + + @Test + public void testSplitContainSuperLongValue() throws IOException { + String superLongValue = "%5Cx1A%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%7E%29%5CxEF%5CxBF%5CxBD%5Cx1B+%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5Cx13%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5B"; + + createAppendTrieDict(Arrays.asList("a", superLongValue)); + } + + @Test + public void testSuperLongValueAsFileName() throws IOException { + String superLongValue = "%5Cx1A%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%7E%29%5CxEF%5CxBF%5CxBD%5Cx1B+%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5Cx13%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5B"; + + createAppendTrieDict(Arrays.asList("a", superLongValue)); + } + + @Test + public void testIllegalFileNameValue() throws IOException { + createAppendTrieDict(Arrays.asList("::", ":")); + } + + @Test + public void testSkipAddValue() throws IOException { + createAppendTrieDict(new ArrayList<String>()); + } + + @Test + public void testSerialize() throws IOException { + AppendTrieDictionaryBuilder builder = createBuilder(); + AppendTrieDictionary dict = builder.build(0); + + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + DataOutputStream dataout = new DataOutputStream(bout); + dict.write(dataout); + dataout.close(); + ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray()); + DataInputStream datain = new DataInputStream(bin); + + assertNull(new Path(datain.readUTF()).toUri().getScheme()); + datain.close(); + } + + @Test + public void testDeserialize() throws IOException { + AppendTrieDictionaryBuilder builder = createBuilder(); + builder.setMaxId(Integer.MAX_VALUE - 2); + builder.addValue("a"); + builder.addValue("ab"); + List<String> strList = Lists.newArrayList("a", "ab"); + AppendTrieDictionary dict = builder.build(0); + TreeMap checkMap = new TreeMap(); + BytesConverter converter = new StringBytesConverter(); + for (String str: strList) { + byte[] bytes = converter.convertToBytes(str); + int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, bytes.length, 0); + checkMap.put(id, str); + } + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + DataOutputStream dataout = new DataOutputStream(bout); + dict.setSaveAbsolutePath(true); + dict.write(dataout); + dataout.close(); + ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray()); + DataInputStream datain = new DataInputStream(bin); + AppendTrieDictionary<String> r = new AppendTrieDictionary<String>(); + r.readFields(datain); + datain.close(); + + for (String str : strList) { + byte[] bytes = converter.convertToBytes(str); + int id = r.getIdFromValueBytesWithoutCache(bytes, 0, bytes.length, 0); + assertNotEquals(String.format("Value %s not exist", str), -1, id); + assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id)); + } + } + + private void createAppendTrieDict(List<String> valueList) throws IOException { + KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "1"); + + AppendTrieDictionaryBuilder builder = createBuilder(); + + for (String value : valueList) { + builder.addValue(value); + } + + builder.build(0); + } + + private static class CachedFileFilter implements FileFilter { + @Override + public boolean accept(File pathname) { + return pathname.getName().startsWith("cached_"); + } + } + + private static class VersionFilter implements FileFilter { + @Override + public boolean accept(File pathname) { + return pathname.getName().startsWith(GlobalDictHDFSStore.VERSION_PREFIX); + } + } + + @Test + public void testMultiVersions() throws IOException, InterruptedException { + KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4"); + + AppendTrieDictionaryBuilder builder = createBuilder(); + builder.addValue("a"); + builder.addValue("b"); + builder.addValue("c"); + builder.addValue("d"); + builder.addValue("e"); + builder.addValue("f"); + AppendTrieDictionary dict = builder.build(0); + + assertEquals(2, dict.getIdFromValue("b")); + + // re-open dict, append new data + builder = createBuilder(); + builder.addValue("g"); + + // new data is not visible + try { + dict.getIdFromValue("g"); + fail("Value 'g' (g) not exists!"); + } catch (IllegalArgumentException e) { + + } + + // append data, and be visible for new immutable map + builder.addValue("h"); + + AppendTrieDictionary newDict = builder.build(0); + assert newDict.equals(dict); + + assertEquals(7, newDict.getIdFromValue("g")); + assertEquals(8, newDict.getIdFromValue("h")); + + // Check versions retention + File dir = new File(LOCAL_BASE_DIR); + assertEquals(2, dir.listFiles(new VersionFilter()).length); + } + + @Test + public void testVersionRetention() throws IOException, InterruptedException { + KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4"); + KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-max-versions", "1"); + KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-version-ttl", "1000"); + + AppendTrieDictionaryBuilder builder = createBuilder(); + builder.addValue("a"); + + //version 1 + builder.build(0); + + // Check versions retention + File dir = new File(LOCAL_BASE_DIR); + assertEquals(1, dir.listFiles(new VersionFilter()).length); + + // sleep to make version 1 expired + Thread.sleep(1200); + + //version 2 + builder = createBuilder(); + builder.addValue(""); + builder.build(0); + + // Check versions retention + assertEquals(1, dir.listFiles(new VersionFilter()).length); + } + + @Test + public void testOldDirFormat() throws IOException { + KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4"); + + AppendTrieDictionaryBuilder builder = createBuilder(); + builder.addValue("a"); + builder.addValue("b"); + builder.addValue("c"); + builder.addValue("d"); + builder.addValue("e"); + builder.addValue("f"); + builder.build(0); + + convertDirToOldFormat(BASE_DIR); + + File dir = new File(LOCAL_BASE_DIR); + assertEquals(0, dir.listFiles(new VersionFilter()).length); + assertEquals(3, dir.listFiles(new CachedFileFilter()).length); + + //convert older format to new format when builder init + builder = createBuilder(); + builder.build(0); + + assertEquals(1, dir.listFiles(new VersionFilter()).length); + } + + private void convertDirToOldFormat(String baseDir) throws IOException { + Path basePath = new Path(baseDir); + FileSystem fs = HadoopUtil.getFileSystem(basePath); + + // move version dir to base dir, to simulate the older format + GlobalDictHDFSStore store = new GlobalDictHDFSStore(baseDir); + Long[] versions = store.listAllVersions(); + Path versionPath = store.getVersionDir(versions[versions.length - 1]); + Path tmpVersionPath = new Path(versionPath.getParent().getParent(), versionPath.getName()); + fs.rename(versionPath, tmpVersionPath); + fs.delete(new Path(baseDir), true); + fs.rename(tmpVersionPath, new Path(baseDir)); + } + + @Test + public void testOldIndexFormat() throws IOException { + KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4"); + + AppendTrieDictionaryBuilder builder = createBuilder(); + builder.addValue("a"); + builder.addValue("b"); + builder.addValue("c"); + builder.addValue("d"); + builder.addValue("e"); + builder.addValue("f"); + builder.build(0); + + convertIndexToOldFormat(BASE_DIR); + + builder = createBuilder(); + builder.addValue("g"); + builder.addValue("h"); + builder.addValue("i"); + AppendTrieDictionary dict = builder.build(0); + + assertEquals(1, dict.getIdFromValue("a")); + assertEquals(7, dict.getIdFromValue("g")); + } + + private void convertIndexToOldFormat(String baseDir) throws IOException { + Path basePath = new Path(baseDir); + FileSystem fs = HadoopUtil.getFileSystem(basePath); + + GlobalDictHDFSStore store = new GlobalDictHDFSStore(baseDir); + Long[] versions = store.listAllVersions(); + GlobalDictMetadata metadata = store.getMetadata(versions[versions.length - 1]); + + //convert v2 index to v1 index + Path versionPath = store.getVersionDir(versions[versions.length - 1]); + Path v2IndexFile = new Path(versionPath, V2_INDEX_NAME); + + fs.delete(v2IndexFile, true); + GlobalDictHDFSStore.IndexFormat indexFormatV1 = new GlobalDictHDFSStore.IndexFormatV1(fs, HadoopUtil.getCurrentConfiguration()); + indexFormatV1.writeIndexFile(versionPath, metadata); + + //convert v2 fileName format to v1 fileName format + for (Map.Entry<AppendDictSliceKey, String> entry : metadata.sliceFileMap.entrySet()) { + fs.rename(new Path(versionPath, entry.getValue()), new Path(versionPath, "cached_" + entry.getKey())); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/1c88db54/core-dictionary/src/test/java/org/apache/kylin/dict/global/AppendTrieDictionaryTest.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/global/AppendTrieDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/global/AppendTrieDictionaryTest.java deleted file mode 100644 index 6b39c36..0000000 --- a/core-dictionary/src/test/java/org/apache/kylin/dict/global/AppendTrieDictionaryTest.java +++ /dev/null @@ -1,525 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.dict.global; - -import static org.apache.kylin.dict.global.GlobalDictHDFSStore.V2_INDEX_NAME; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.fail; - -import java.io.BufferedReader; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileFilter; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.TreeMap; -import java.util.UUID; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.common.util.LocalFileMetadataTestCase; -import org.apache.kylin.dict.AppendTrieDictionary; -import org.apache.kylin.dict.BytesConverter; -import org.apache.kylin.dict.StringBytesConverter; -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase { - private static final String RESOURCE_DIR = "/dict/append_dict_test/" + UUID.randomUUID(); - private static String BASE_DIR; - private static String LOCAL_BASE_DIR; - - @Before - public void beforeTest() { - staticCreateTestMetadata(); - KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "50000"); - BASE_DIR = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/GlobalDict" + RESOURCE_DIR + "/"; - LOCAL_BASE_DIR = getLocalWorkingDirectory() + "/resources/GlobalDict" + RESOURCE_DIR + "/"; - } - - @After - public void afterTest() { - cleanup(); - staticCleanupTestMetadata(); - } - - private void cleanup() { - Path basePath = new Path(BASE_DIR); - try { - HadoopUtil.getFileSystem(basePath).delete(basePath, true); - } catch (IOException e) { - } - } - - private static final String[] words = new String[] { "paint", "par", "part", "parts", "partition", "partitions", "party", "partie", "parties", "patient", "taste", "tar", "trie", "try", "tries", "åå ¸", "åå ¸æ ", "忝", // non-ascii characters - "", // empty - "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii", "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiipaiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii", - "paintjkjdfklajkdljfkdsajklfjklsadjkjekjrklewjrklewjklrjklewjkljkljkljkljweklrjewkljrklewjrlkjewkljrkljkljkjlkjjkljkljkljkljlkjlkjlkjljdfadfads" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" - + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk", - "paint", "tar", "try", // some dup - }; - - private AppendTrieDictionaryBuilder createBuilder(String resourceDir) throws IOException { - int maxEntriesPerSlice = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize(); - String baseDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/GlobalDict" + resourceDir + "/"; - return new AppendTrieDictionaryBuilder(baseDir, maxEntriesPerSlice, true); - } - - @Test - public void testStringRepeatly() throws IOException { - ArrayList<String> list = new ArrayList<>(); - Collections.addAll(list, words); - ArrayList<String> notfound = new ArrayList<>(); - notfound.add("pa"); - notfound.add("pars"); - notfound.add("tri"); - notfound.add("å"); - for (int i = 0; i < 50; i++) { - testStringDictAppend(list, notfound, true); - //to speed up the test - cleanup(); - } - } - - @Test - public void testEnglishWords() throws Exception { - InputStream is = new FileInputStream("src/test/resources/dict/english-words.80 (scowl-2015.05.18).txt"); - ArrayList<String> str = loadStrings(is); - testStringDictAppend(str, null, false); - } - - @Test - public void testCategoryNames() throws Exception { - InputStream is = new FileInputStream("src/test/resources/dict/dw_category_grouping_names.dat"); - ArrayList<String> str = loadStrings(is); - testStringDictAppend(str, null, true); - } - - private static ArrayList<String> loadStrings(InputStream is) throws Exception { - ArrayList<String> r = new ArrayList<String>(); - BufferedReader reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); - try { - String word; - while ((word = reader.readLine()) != null) { - word = word.trim(); - if (word.isEmpty() == false) - r.add(word); - } - } finally { - reader.close(); - is.close(); - } - return r; - } - - @Ignore("need huge key set") - @Test - public void testHugeKeySet() throws IOException { - AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); - - AppendTrieDictionary<String> dict = null; - - InputStream is = new FileInputStream("src/test/resources/dict/huge_key"); - BufferedReader reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); - try { - String word; - while ((word = reader.readLine()) != null) { - word = word.trim(); - if (!word.isEmpty()) - builder.addValue(word); - } - } finally { - reader.close(); - is.close(); - } - dict = builder.build(0); - dict.dump(System.out); - } - - private void testStringDictAppend(ArrayList<String> list, ArrayList<String> notfound, boolean shuffleList) throws IOException { - Random rnd = new Random(System.currentTimeMillis()); - ArrayList<String> strList = new ArrayList<String>(); - strList.addAll(list); - if (shuffleList) { - Collections.shuffle(strList, rnd); - } - BytesConverter converter = new StringBytesConverter(); - - AppendTrieDictionaryBuilder b = createBuilder(RESOURCE_DIR); - - TreeMap<Integer, String> checkMap = new TreeMap<>(); - int firstAppend = rnd.nextInt(strList.size() / 2); - int secondAppend = firstAppend + rnd.nextInt((strList.size() - firstAppend) / 2); - int appendIndex = 0; - int checkIndex = 0; - - for (; appendIndex < firstAppend; appendIndex++) { - b.addValue(strList.get(appendIndex)); - } - AppendTrieDictionary<String> dict = b.build(0); - dict.dump(System.out); - for (; checkIndex < firstAppend; checkIndex++) { - String str = strList.get(checkIndex); - byte[] bytes = converter.convertToBytes(str); - int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, bytes.length, 0); - assertNotEquals(String.format("Value %s not exist", str), -1, id); - assertFalse(String.format("Id %d for %s should be empty, but is %s", id, str, checkMap.get(id)), checkMap.containsKey(id) && !str.equals(checkMap.get(id))); - checkMap.put(id, str); - } - - // reopen dict and append - b = createBuilder(RESOURCE_DIR); - - for (; appendIndex < secondAppend; appendIndex++) { - b.addValue(strList.get(appendIndex)); - } - AppendTrieDictionary<String> newDict = b.build(0); - assert newDict.equals(dict); - dict = newDict; - dict.dump(System.out); - checkIndex = 0; - for (; checkIndex < secondAppend; checkIndex++) { - String str = strList.get(checkIndex); - byte[] bytes = converter.convertToBytes(str); - int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, bytes.length, 0); - assertNotEquals(String.format("Value %s not exist", str), -1, id); - if (checkIndex < firstAppend) { - assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id)); - } else { - // check second append str, should be new id - assertFalse(String.format("Id %d for %s should be empty, but is %s", id, str, checkMap.get(id)), checkMap.containsKey(id) && !str.equals(checkMap.get(id))); - checkMap.put(id, str); - } - } - - // reopen dict and append rest str - b = createBuilder(RESOURCE_DIR); - - for (; appendIndex < strList.size(); appendIndex++) { - b.addValue(strList.get(appendIndex)); - } - newDict = b.build(0); - assert newDict.equals(dict); - dict = newDict; - dict.dump(System.out); - checkIndex = 0; - for (; checkIndex < strList.size(); checkIndex++) { - String str = strList.get(checkIndex); - byte[] bytes = converter.convertToBytes(str); - int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, bytes.length, 0); - assertNotEquals(String.format("Value %s not exist", str), -1, id); - if (checkIndex < secondAppend) { - assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id)); - } else { - // check third append str, should be new id - assertFalse(String.format("Id %d for %s should be empty, but is %s", id, str, checkMap.get(id)), checkMap.containsKey(id) && !str.equals(checkMap.get(id))); - checkMap.put(id, str); - } - } - if (notfound != null) { - for (String s : notfound) { - byte[] bytes = converter.convertToBytes(s); - int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, bytes.length, 0); - assertEquals(-1, id); - } - } - - dict = testSerialize(dict, converter); - for (String str : strList) { - byte[] bytes = converter.convertToBytes(str); - int id = dict.getIdFromValueBytesWithoutCache(bytes, 0, bytes.length, 0); - assertNotEquals(String.format("Value %s not exist", str), -1, id); - assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id)); - } - } - - private static AppendTrieDictionary<String> testSerialize(AppendTrieDictionary<String> dict, BytesConverter converter) { - try { - ByteArrayOutputStream bout = new ByteArrayOutputStream(); - DataOutputStream dataout = new DataOutputStream(bout); - dict.write(dataout); - dataout.close(); - ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray()); - DataInputStream datain = new DataInputStream(bin); - AppendTrieDictionary<String> r = new AppendTrieDictionary<String>(); - r.readFields(datain); - datain.close(); - return r; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Test - public void testMaxInteger() throws IOException { - AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); - builder.setMaxId(Integer.MAX_VALUE - 2); - builder.addValue("a"); - builder.addValue("ab"); - builder.addValue("acd"); - builder.addValue("ac"); - AppendTrieDictionary dict = builder.build(0); - assertEquals(2147483646, dict.getIdFromValue("a", 0)); - assertEquals(2147483647, dict.getIdFromValue("ab", 0)); - assertEquals(-2147483647, dict.getIdFromValue("ac", 0)); - assertEquals(-2147483648, dict.getIdFromValue("acd", 0)); - } - - @Ignore("Only occurred when value is very long (>8000 bytes)") - @Test - public void testSuperLongValue() throws IOException { - AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); - String value = "a"; - for (int i = 0; i < 10000; i++) { - value += "a"; - try { - builder.addValue(value); - } catch (StackOverflowError e) { - System.out.println("\nstack overflow " + i); - throw e; - } - } - AppendTrieDictionary dictionary = builder.build(0); - dictionary.getMaxId(); - } - - @Test - public void testSplitContainSuperLongValue() throws IOException { - String superLongValue = "%5Cx1A%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%7E%29%5CxEF%5CxBF%5CxBD%5Cx1B+%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5Cx13%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5B"; - - createAppendTrieDict(Arrays.asList("a", superLongValue)); - } - - @Test - public void testSuperLongValueAsFileName() throws IOException { - String superLongValue = "%5Cx1A%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%7E%29%5CxEF%5CxBF%5CxBD%5Cx1B+%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5Cx13%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5B"; - - createAppendTrieDict(Arrays.asList("a", superLongValue)); - } - - @Test - public void testIllegalFileNameValue() throws IOException { - createAppendTrieDict(Arrays.asList("::", ":")); - } - - @Test - public void testSkipAddValue() throws IOException { - createAppendTrieDict(new ArrayList<String>()); - } - - private void createAppendTrieDict(List<String> valueList) throws IOException { - KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "1"); - - AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); - - for (String value : valueList) { - builder.addValue(value); - } - - builder.build(0); - } - - private static class CachedFileFilter implements FileFilter { - @Override - public boolean accept(File pathname) { - return pathname.getName().startsWith("cached_"); - } - } - - private static class VersionFilter implements FileFilter { - @Override - public boolean accept(File pathname) { - return pathname.getName().startsWith(GlobalDictHDFSStore.VERSION_PREFIX); - } - } - - @Test - public void testMultiVersions() throws IOException, InterruptedException { - KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4"); - - AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); - builder.addValue("a"); - builder.addValue("b"); - builder.addValue("c"); - builder.addValue("d"); - builder.addValue("e"); - builder.addValue("f"); - AppendTrieDictionary dict = builder.build(0); - - assertEquals(2, dict.getIdFromValue("b")); - - // re-open dict, append new data - builder = createBuilder(RESOURCE_DIR); - builder.addValue("g"); - - // new data is not visible - try { - dict.getIdFromValue("g"); - fail("Value 'g' (g) not exists!"); - } catch (IllegalArgumentException e) { - - } - - // append data, and be visible for new immutable map - builder.addValue("h"); - - AppendTrieDictionary newDict = builder.build(0); - assert newDict.equals(dict); - - assertEquals(7, newDict.getIdFromValue("g")); - assertEquals(8, newDict.getIdFromValue("h")); - - // Check versions retention - File dir = new File(LOCAL_BASE_DIR); - assertEquals(2, dir.listFiles(new VersionFilter()).length); - } - - @Test - public void testVersionRetention() throws IOException, InterruptedException { - KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4"); - KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-max-versions", "1"); - KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-version-ttl", "1000"); - - AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); - builder.addValue("a"); - - //version 1 - builder.build(0); - - // Check versions retention - File dir = new File(LOCAL_BASE_DIR); - assertEquals(1, dir.listFiles(new VersionFilter()).length); - - // sleep to make version 1 expired - Thread.sleep(1200); - - //version 2 - builder = createBuilder(RESOURCE_DIR); - builder.addValue(""); - builder.build(0); - - // Check versions retention - assertEquals(1, dir.listFiles(new VersionFilter()).length); - } - - @Test - public void testOldDirFormat() throws IOException { - KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4"); - - AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); - builder.addValue("a"); - builder.addValue("b"); - builder.addValue("c"); - builder.addValue("d"); - builder.addValue("e"); - builder.addValue("f"); - builder.build(0); - - convertDirToOldFormat(BASE_DIR); - - File dir = new File(LOCAL_BASE_DIR); - assertEquals(0, dir.listFiles(new VersionFilter()).length); - assertEquals(3, dir.listFiles(new CachedFileFilter()).length); - - //convert older format to new format when builder init - builder = createBuilder(RESOURCE_DIR); - builder.build(0); - - assertEquals(1, dir.listFiles(new VersionFilter()).length); - } - - private void convertDirToOldFormat(String baseDir) throws IOException { - Path basePath = new Path(baseDir); - FileSystem fs = HadoopUtil.getFileSystem(basePath); - - // move version dir to base dir, to simulate the older format - GlobalDictHDFSStore store = new GlobalDictHDFSStore(baseDir); - Long[] versions = store.listAllVersions(); - Path versionPath = store.getVersionDir(versions[versions.length - 1]); - Path tmpVersionPath = new Path(versionPath.getParent().getParent(), versionPath.getName()); - fs.rename(versionPath, tmpVersionPath); - fs.delete(new Path(baseDir), true); - fs.rename(tmpVersionPath, new Path(baseDir)); - } - - @Test - public void testOldIndexFormat() throws IOException { - KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4"); - - AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); - builder.addValue("a"); - builder.addValue("b"); - builder.addValue("c"); - builder.addValue("d"); - builder.addValue("e"); - builder.addValue("f"); - builder.build(0); - - convertIndexToOldFormat(BASE_DIR); - - builder = createBuilder(RESOURCE_DIR); - builder.addValue("g"); - builder.addValue("h"); - builder.addValue("i"); - AppendTrieDictionary dict = builder.build(0); - - assertEquals(1, dict.getIdFromValue("a")); - assertEquals(7, dict.getIdFromValue("g")); - } - - private void convertIndexToOldFormat(String baseDir) throws IOException { - Path basePath = new Path(baseDir); - FileSystem fs = HadoopUtil.getFileSystem(basePath); - - GlobalDictHDFSStore store = new GlobalDictHDFSStore(baseDir); - Long[] versions = store.listAllVersions(); - GlobalDictMetadata metadata = store.getMetadata(versions[versions.length - 1]); - - //convert v2 index to v1 index - Path versionPath = store.getVersionDir(versions[versions.length - 1]); - Path v2IndexFile = new Path(versionPath, V2_INDEX_NAME); - - fs.delete(v2IndexFile, true); - GlobalDictHDFSStore.IndexFormat indexFormatV1 = new GlobalDictHDFSStore.IndexFormatV1(fs, HadoopUtil.getCurrentConfiguration()); - indexFormatV1.writeIndexFile(versionPath, metadata); - - //convert v2 fileName format to v1 fileName format - for (Map.Entry<AppendDictSliceKey, String> entry : metadata.sliceFileMap.entrySet()) { - fs.rename(new Path(versionPath, entry.getValue()), new Path(versionPath, "cached_" + entry.getKey())); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/1c88db54/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index babf69b..fcedcc1 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -57,6 +57,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.StorageURL; import org.apache.kylin.common.util.CliCommandExecutor; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.OptionsHelper; @@ -452,7 +453,10 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { System.setProperty(KylinConfig.KYLIN_CONF, metaDir.getAbsolutePath()); logger.info("The absolute path for meta dir is " + metaDir.getAbsolutePath()); KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - kylinConfig.setMetadataUrl(metaDir.getAbsolutePath()); + Map<String, String> paramsMap = new HashMap<>(); + paramsMap.put("path", metaDir.getAbsolutePath()); + StorageURL storageURL = new StorageURL(kylinConfig.getMetadataUrl().getIdentifier(), "ifile", paramsMap); + kylinConfig.setMetadataUrl(storageURL.toString()); return kylinConfig; } else { return KylinConfig.getInstanceFromEnv(); @@ -469,8 +473,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { logger.info("Ready to load KylinConfig from uri: {}", uri); KylinConfig config; FileSystem fs; - int cut = uri.indexOf('@'); - String realHdfsPath = uri.substring(0, cut) + "/" + KylinConfig.KYLIN_CONF_PROPERTIES_FILE; + String realHdfsPath = StorageURL.valueOf(uri).getParameter("path") + "/" + KylinConfig.KYLIN_CONF_PROPERTIES_FILE; try { fs = HadoopUtil.getFileSystem(realHdfsPath); InputStream is = fs.open(new Path(realHdfsPath)); http://git-wip-us.apache.org/repos/asf/kylin/blob/1c88db54/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java index 47ea3d0..7d76ce4 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java @@ -19,6 +19,7 @@ package org.apache.kylin.engine.spark; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.StorageURL; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.cube.CubeSegment; @@ -30,6 +31,9 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; +import java.util.Map; + /** */ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 { @@ -89,6 +93,9 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 { } private static String getSegmentMetadataUrl(KylinConfig kylinConfig, String segmentID) { - return kylinConfig.getHdfsWorkingDirectory() + "metadata/" + segmentID + "@hdfs"; + Map<String, String> param = new HashMap<>(); + param.put("path", kylinConfig.getHdfsWorkingDirectory() + "metadata/" + segmentID); + return new StorageURL(kylinConfig.getMetadataUrl().getIdentifier(), "hdfs", param).toString(); +// return kylinConfig.getHdfsWorkingDirectory() + "metadata/" + segmentID + "@hdfs"; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/1c88db54/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java index d185f4e..720e7e2 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java @@ -58,7 +58,7 @@ public class HDFSResourceStore extends ResourceStore { StorageURL metadataUrl = kylinConfig.getMetadataUrl(); Preconditions.checkState(HDFS_SCHEME.equals(metadataUrl.getScheme())); - String path = metadataUrl.getIdentifier(); + String path = metadataUrl.getParameter("path"); fs = HadoopUtil.getFileSystem(path); Path metadataPath = new Path(path); if (fs.exists(metadataPath) == false) { http://git-wip-us.apache.org/repos/asf/kylin/blob/1c88db54/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/IdentifierFileResourceStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/IdentifierFileResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/IdentifierFileResourceStore.java new file mode 100644 index 0000000..b23a916 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/IdentifierFileResourceStore.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.storage.hdfs; + +import com.google.common.base.Preconditions; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.StorageURL; +import org.apache.kylin.common.persistence.FileResourceStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; + +/** + * it need identifier to transfer relative path to absolute path when building cube like reading global dict and + * using path to locate the data and make use of it, so this ResourceStore separate identifier from the data path + * saved in params. + * + */ +public class IdentifierFileResourceStore extends FileResourceStore { + private static final Logger logger = LoggerFactory.getLogger(IdentifierFileResourceStore.class); + + private static final String IFILE_SCHEME = "ifile"; + + private File root; + + public IdentifierFileResourceStore(KylinConfig kylinConfig) throws Exception { + super(kylinConfig); + } + + protected String getPath(KylinConfig kylinConfig) { + StorageURL metadataUrl = kylinConfig.getMetadataUrl(); + Preconditions.checkState(IFILE_SCHEME.equals(metadataUrl.getScheme())); + return metadataUrl.getParameter("path"); + } +}