KYLIN-2624 introduce StorageURL and set timeout for metadata access
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/2dfc5bb3 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/2dfc5bb3 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/2dfc5bb3 Branch: refs/heads/master Commit: 2dfc5bb36a00245b95deeebb5bb3bca9c353c934 Parents: 7c152c5 Author: Li Yang <liy...@apache.org> Authored: Thu May 18 14:16:17 2017 +0800 Committer: nichunen <zjsy...@sjtu.org> Committed: Fri May 19 13:52:53 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 30 ++- .../org/apache/kylin/common/StorageURL.java | 183 +++++++++++++++++++ .../common/persistence/FileResourceStore.java | 2 +- .../apache/kylin/common/KylinConfigTest.java | 9 +- .../org/apache/kylin/common/StorageURLTest.java | 103 +++++++++++ .../persistence/LocalFileResourceStoreTest.java | 3 +- .../common/persistence/ResourceStoreTest.java | 18 +- .../common/util/LocalFileMetadataTestCase.java | 7 + .../apache/kylin/dict/DictionaryManager.java | 18 +- .../dict/global/AppendTrieDictionaryTest.java | 5 +- .../impl/threadpool/DistributedScheduler.java | 11 +- .../apache/kylin/storage/StorageContext.java | 7 +- .../kylin/engine/mr/steps/CubeReducerTest.java | 2 +- .../engine/mr/steps/NDCuboidMapperTest.java | 2 +- .../kylin/job/BaseTestDistributedScheduler.java | 5 +- .../kylin/storage/hbase/ITStorageTest.java | 3 +- .../apache/kylin/query/schema/OLAPSchema.java | 5 +- .../rest/security/MockAclHBaseStorage.java | 11 +- .../rest/security/RealAclHBaseStorage.java | 15 +- .../apache/kylin/rest/service/QueryService.java | 11 +- .../kylin/storage/hbase/HBaseConnection.java | 51 +++--- .../kylin/storage/hbase/HBaseResourceStore.java | 46 +++-- .../storage/hbase/util/CubeMigrationCLI.java | 20 +- .../hbase/util/GridTableHBaseBenchmark.java | 3 +- .../kylin/storage/hdfs/HDFSResourceStore.java | 50 ++--- .../hbase/steps/CubeHFileMapper2Test.java | 4 +- .../storage/hbase/steps/MockupMapContext.java | 2 +- .../org/apache/kylin/tool/CubeMigrationCLI.java | 20 +- 28 files changed, 461 insertions(+), 185 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index b620924..caef95e 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -169,7 +169,7 @@ abstract public class KylinConfigBase implements Serializable { } public String toString() { - return getMetadataUrl(); + return getMetadataUrl().toString(); } // ============================================================================ @@ -205,8 +205,8 @@ abstract public class KylinConfigBase implements Serializable { // METADATA // ============================================================================ - public String getMetadataUrl() { - return getOptional("kylin.metadata.url"); + public StorageURL getMetadataUrl() { + return StorageURL.valueOf(getOptional("kylin.metadata.url", "")); } // for test only @@ -215,14 +215,7 @@ abstract public class KylinConfigBase implements Serializable { } public String getMetadataUrlPrefix() { - String metadataUrl = getMetadataUrl(); - String defaultPrefix = "kylin_metadata"; - - int cut = metadataUrl.lastIndexOf('@'); - if (cut > 0) { - return metadataUrl.substring(0, cut); - } - return defaultPrefix; + return getMetadataUrl().getIdentifier(); } public String[] getRealizationProviders() { @@ -558,8 +551,14 @@ abstract public class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.storage.default", "2")); } - public String getStorageUrl() { - return getOptional("kylin.storage.url"); + public StorageURL getStorageUrl() { + String url = getOptional("kylin.storage.url", "default@hbase"); + + // for backward compatibility + if ("hbase".equals(url)) + url = "default@hbase"; + + return StorageURL.valueOf(url); } public String getHBaseClusterFs() { @@ -987,10 +986,7 @@ abstract public class KylinConfigBase implements Serializable { } public String getResourceStoreImpl() { - String metadataUrl = KylinConfig.getInstanceFromEnv().getMetadataUrl(); - int cut = metadataUrl.indexOf('@'); - String key = cut < 0 ? "" : metadataUrl.substring(cut + 1); - return getResourceStoreImpls().get(key); + return getResourceStoreImpls().get(getMetadataUrl().getScheme()); } public String getJobTrackingURLPattern() { http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/core-common/src/main/java/org/apache/kylin/common/StorageURL.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/StorageURL.java b/core-common/src/main/java/org/apache/kylin/common/StorageURL.java new file mode 100644 index 0000000..cebbc27 --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/StorageURL.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.common; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableMap; + +/** + * The object form of metadata/storage URL: IDENTIFIER@SCHEME[,PARAM=VALUE,PARAM=VALUE...] + * + * It is not standard URL, but a string of specific format that shares some similar parts with URL. + * + * Immutable by design. + */ +public class StorageURL { + + private static final LoadingCache<String, StorageURL> cache = CacheBuilder.newBuilder()// + .maximumSize(100)// + .build(new CacheLoader<String, StorageURL>() { + @Override + public StorageURL load(String metadataUrl) throws Exception { + return new StorageURL(metadataUrl); + } + }); + + public static StorageURL valueOf(String metadataUrl) { + try { + return cache.get(metadataUrl); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + // ============================================================================ + + final String identifier; + final String scheme; + final Map<String, String> params; + + // package private for test + StorageURL(String metadataUrl) { + boolean first = true; + String n = null; + String s = null; + Map<String, String> m = new LinkedHashMap<>(); + + for (String split : metadataUrl.split(",")) { + if (first) { + // identifier @ scheme + int cut = split.lastIndexOf('@'); + if (cut < 0) { + n = split.trim(); + s = ""; + } else { + n = split.substring(0, cut).trim(); + s = split.substring(cut + 1).trim(); + } + first = false; + } else { + // param = value + int cut = split.indexOf('='); + String k, v; + if (cut < 0) { + k = split.trim(); + v = ""; + } else { + k = split.substring(0, cut).trim(); + v = split.substring(cut + 1).trim(); + } + m.put(k, v); + } + } + + this.identifier = n.isEmpty() ? "kylin_metadata" : n; + this.scheme = s; + this.params = ImmutableMap.copyOf(m); + } + + private StorageURL(String identifier, String scheme, Map<String, String> params) { + this.identifier = identifier; + this.scheme = scheme; + this.params = ImmutableMap.copyOf(params); + } + + public String getIdentifier() { + return identifier; + } + + public String getScheme() { + return scheme; + } + + public boolean containsParameter(String k) { + return params.containsKey(k); + } + + public String getParameter(String k) { + return params.get(k); + } + + public Map<String, String> getAllParameters() { + return params; + } + + public StorageURL copy(Map<String, String> params) { + return new StorageURL(identifier, scheme, params); + } + + @Override + public String toString() { + String str = identifier; + if (!scheme.isEmpty()) + str += "@" + scheme; + + for (Entry<String, String> kv : params.entrySet()) { + str += "," + kv.getKey(); + if (!kv.getValue().isEmpty()) + str += "=" + kv.getValue(); + } + return str; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((identifier == null) ? 0 : identifier.hashCode()); + result = prime * result + ((params == null) ? 0 : params.hashCode()); + result = prime * result + ((scheme == null) ? 0 : scheme.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + StorageURL other = (StorageURL) obj; + if (identifier == null) { + if (other.identifier != null) + return false; + } else if (!identifier.equals(other.identifier)) + return false; + if (params == null) { + if (other.params != null) + return false; + } else if (!params.equals(other.params)) + return false; + if (scheme == null) { + if (other.scheme != null) + return false; + } else if (!scheme.equals(other.scheme)) + return false; + return true; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java index d84e587..82cf451 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java @@ -44,7 +44,7 @@ public class FileResourceStore extends ResourceStore { public FileResourceStore(KylinConfig kylinConfig) { super(kylinConfig); - root = new File(kylinConfig.getMetadataUrl()).getAbsoluteFile(); + root = new File(kylinConfig.getMetadataUrl().getIdentifier()).getAbsoluteFile(); if (root.exists() == false) throw new IllegalArgumentException("File not exist by '" + kylinConfig.getMetadataUrl() + "': " + root.getAbsolutePath()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java b/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java index 3976c6c..6027af3 100644 --- a/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java +++ b/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java @@ -86,7 +86,6 @@ public class KylinConfigTest extends HotLoadKylinPropertiesTestCase { @Test public void testGetMetadataUrlPrefix() { KylinConfig config = KylinConfig.getInstanceFromEnv(); - final String default_metadata_prefix = "kylin_metadata"; config.setMetadataUrl("testMetaPrefix@hbase"); assertEquals("testMetaPrefix", config.getMetadataUrlPrefix()); @@ -95,7 +94,7 @@ public class KylinConfigTest extends HotLoadKylinPropertiesTestCase { assertEquals("testMetaPrefix", config.getMetadataUrlPrefix()); config.setMetadataUrl("/kylin/temp"); - assertEquals(default_metadata_prefix, config.getMetadataUrlPrefix()); + assertEquals("/kylin/temp", config.getMetadataUrlPrefix()); } @Test @@ -107,21 +106,21 @@ public class KylinConfigTest extends HotLoadKylinPropertiesTestCase { KylinConfig sysConfig = KylinConfig.getInstanceFromEnv(); sysConfig.setMetadataUrl(metadata1); - assertEquals(metadata1, KylinConfig.getInstanceFromEnv().getMetadataUrl()); + assertEquals(metadata1, KylinConfig.getInstanceFromEnv().getMetadataUrl().toString()); // test thread-local override KylinConfig threadConfig = KylinConfig.createKylinConfig(new Properties()); threadConfig.setMetadataUrl(metadata2); KylinConfig.setKylinConfigThreadLocal(threadConfig); - assertEquals(metadata2, KylinConfig.getInstanceFromEnv().getMetadataUrl()); + assertEquals(metadata2, KylinConfig.getInstanceFromEnv().getMetadataUrl().toString()); // other threads still use system KylinConfig new Thread(new Runnable() { @Override public void run() { System.out.println("Started new thread."); - assertEquals(metadata1, KylinConfig.getInstanceFromEnv().getMetadataUrl()); + assertEquals(metadata1, KylinConfig.getInstanceFromEnv().getMetadataUrl().toString()); } }).start(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/core-common/src/test/java/org/apache/kylin/common/StorageURLTest.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/StorageURLTest.java b/core-common/src/test/java/org/apache/kylin/common/StorageURLTest.java new file mode 100644 index 0000000..eaa7548 --- /dev/null +++ b/core-common/src/test/java/org/apache/kylin/common/StorageURLTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.common; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +public class StorageURLTest { + + @Test + public void testBasic() { + { + StorageURL id = new StorageURL("hello@hbase"); + assertEquals("hello", id.getIdentifier()); + assertEquals("hbase", id.getScheme()); + assertEquals(0, id.getAllParameters().size()); + assertEquals("hello@hbase", id.toString()); + } + { + StorageURL id = new StorageURL("hello@hbase,a=b,c=d"); + assertEquals("hello", id.getIdentifier()); + assertEquals("hbase", id.getScheme()); + assertEquals(2, id.getAllParameters().size()); + assertEquals("b", id.getParameter("a")); + assertEquals("d", id.getParameter("c")); + assertEquals("hello@hbase,a=b,c=d", id.toString()); + } + { + StorageURL o = new StorageURL("hello@hbase,c=d"); + StorageURL o2 = new StorageURL("hello@hbase,a=b"); + StorageURL id = o.copy(o2.getAllParameters()); + assertEquals("hello", id.getIdentifier()); + assertEquals("hbase", id.getScheme()); + assertEquals(1, id.getAllParameters().size()); + assertEquals("b", id.getParameter("a")); + assertEquals("hello@hbase,a=b", id.toString()); + assertEquals("hello@hbase,c=d", o.toString()); + assertEquals("hello@hbase,a=b", o2.toString()); + } + } + + @Test(expected = NullPointerException.class) + public void testNullInput() { + new StorageURL(null); + } + + @Test + public void testEdgeCases() { + { + StorageURL id = new StorageURL(""); + assertEquals("kylin_metadata", id.getIdentifier()); + assertEquals("", id.getScheme()); + assertEquals(0, id.getAllParameters().size()); + assertEquals("kylin_metadata", id.toString()); + } + { + StorageURL id = new StorageURL("hello@"); + assertEquals("hello", id.getIdentifier()); + assertEquals("", id.getScheme()); + assertEquals(0, id.getAllParameters().size()); + assertEquals("hello", id.toString()); + } + { + StorageURL id = new StorageURL("hello@hbase,a"); + assertEquals("hello", id.getIdentifier()); + assertEquals("hbase", id.getScheme()); + assertEquals(1, id.getAllParameters().size()); + assertEquals("", id.getParameter("a")); + assertEquals("hello@hbase,a", id.toString()); + } + } + + @Test + public void testValueOfCache() { + StorageURL id1 = StorageURL.valueOf("hello@hbase"); + StorageURL id2 = StorageURL.valueOf("hello@hbase"); + StorageURL id3 = StorageURL.valueOf("hello @ hbase"); + StorageURL id4 = StorageURL.valueOf("hello@hbase,a=b"); + assertTrue(id1 == id2); + assertTrue(id1 != id3); + assertTrue(id1.equals(id3)); + assertTrue(id2 != id4); + assertTrue(!id2.equals(id4)); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java b/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java index aca4a0a..63eb04b 100644 --- a/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java +++ b/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java @@ -43,7 +43,8 @@ public class LocalFileResourceStoreTest extends LocalFileMetadataTestCase { @Test public void testFileStore() throws Exception { - ResourceStoreTest.testAStore("", KylinConfig.getInstanceFromEnv()); + KylinConfig config = KylinConfig.getInstanceFromEnv(); + ResourceStoreTest.testAStore(config.getMetadataUrl().toString(), config); } @Test http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java b/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java index 1f563b0..8aff1a3 100644 --- a/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java +++ b/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java @@ -18,19 +18,19 @@ package org.apache.kylin.common.persistence; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.ClassUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.util.List; import java.util.NavigableSet; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.ClassUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Be called by LocalFileResourceStoreTest, ITHBaseResourceStoreTest and ITHDFSResourceStoreTest. @@ -200,7 +200,7 @@ public class ResourceStoreTest { } public static String replaceMetadataUrl(KylinConfig kylinConfig, String newUrl) { - String oldUrl = kylinConfig.getMetadataUrl(); + String oldUrl = kylinConfig.getMetadataUrl().toString(); kylinConfig.setProperty("kylin.metadata.url", newUrl); return oldUrl; } http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/core-common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java b/core-common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java index 4a24941..e5e4f8b 100644 --- a/core-common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java +++ b/core-common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java @@ -76,6 +76,13 @@ public class LocalFileMetadataTestCase extends AbstractKylinTestCase { public void cleanupTestMetadata() { cleanAfterClass(); } + + protected String getLocalWorkingDirectory() { + String dir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory(); + if (dir.startsWith("file://")) + dir = dir.substring("file://".length()); + return dir; + } protected ResourceStore getStore() { return ResourceStore.getStore(KylinConfig.getInstanceFromEnv()); http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java index 427bd14..4c4e334 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java @@ -90,13 +90,17 @@ public class DictionaryManager { private DictionaryManager(KylinConfig config) { this.config = config; - this.dictCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<String, DictionaryInfo>() { - @Override - public void onRemoval(RemovalNotification<String, DictionaryInfo> notification) { - DictionaryManager.logger.info("Dict with resource path " + notification.getKey() + " is removed due to " + notification.getCause()); - } - }).maximumSize(config.getCachedDictMaxEntrySize())// - .expireAfterWrite(1, TimeUnit.DAYS).build(new CacheLoader<String, DictionaryInfo>() { + this.dictCache = CacheBuilder.newBuilder()// + .softValues()// + .removalListener(new RemovalListener<String, DictionaryInfo>() { + @Override + public void onRemoval(RemovalNotification<String, DictionaryInfo> notification) { + DictionaryManager.logger.info("Dict with resource path " + notification.getKey() + " is removed due to " + notification.getCause()); + } + })// + .maximumSize(config.getCachedDictMaxEntrySize())// + .expireAfterWrite(1, TimeUnit.DAYS)// + .build(new CacheLoader<String, DictionaryInfo>() { @Override public DictionaryInfo load(String key) throws Exception { DictionaryInfo dictInfo = DictionaryManager.this.load(key, true); http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/core-dictionary/src/test/java/org/apache/kylin/dict/global/AppendTrieDictionaryTest.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/global/AppendTrieDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/global/AppendTrieDictionaryTest.java index f6ba75a..47011fe 100644 --- a/core-dictionary/src/test/java/org/apache/kylin/dict/global/AppendTrieDictionaryTest.java +++ b/core-dictionary/src/test/java/org/apache/kylin/dict/global/AppendTrieDictionaryTest.java @@ -46,8 +46,8 @@ import java.util.UUID; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.dict.AppendTrieDictionary; import org.apache.kylin.dict.BytesConverter; @@ -60,13 +60,14 @@ import org.junit.Test; public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase { private static final String RESOURCE_DIR = "/dict/append_dict_test/" + UUID.randomUUID(); private static String BASE_DIR; - private static String LOCAL_BASE_DIR = "/tmp/kylin/kylin_metadata/resources/GlobalDict" + RESOURCE_DIR + "/"; + private static String LOCAL_BASE_DIR; @Before public void beforeTest() { staticCreateTestMetadata(); KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "50000"); BASE_DIR = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/GlobalDict" + RESOURCE_DIR + "/"; + LOCAL_BASE_DIR = getLocalWorkingDirectory() + "/resources/GlobalDict" + RESOURCE_DIR + "/"; } @After http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java index fa1b8e0..fd7b43a 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java @@ -320,13 +320,20 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn } public String getLockPath(String pathName) { - return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName; + return dropDoubleSlash(ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName); } private String getWatchPath() { - return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix(); + return dropDoubleSlash(ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix()); } + public static String dropDoubleSlash(String path) { + for (int n = Integer.MAX_VALUE; n > path.length();) { + n = path.length(); + path = path.replace("//", "/"); + } + return path; + } @Override public void shutdown() throws SchedulerException { logger.info("Will shut down Job Engine ...."); http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java index 4522261..78cf97c 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java @@ -20,6 +20,7 @@ package org.apache.kylin.storage; import java.util.concurrent.atomic.AtomicLong; +import org.apache.kylin.common.StorageURL; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.metadata.realization.IRealization; @@ -35,7 +36,7 @@ import com.google.common.collect.Range; public class StorageContext { private static final Logger logger = LoggerFactory.getLogger(StorageContext.class); - private String connUrl; + private StorageURL connUrl; private int limit = Integer.MAX_VALUE; private boolean overlookOuterLimit = false; private int offset = 0; @@ -56,11 +57,11 @@ public class StorageContext { private Range<Long> reusedPeriod; - public String getConnUrl() { + public StorageURL getConnUrl() { return connUrl; } - public void setConnUrl(String connUrl) { + public void setConnUrl(StorageURL connUrl) { this.connUrl = connUrl; } http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java index 29ca9b8..7616df2 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java @@ -60,7 +60,7 @@ public class CubeReducerTest extends LocalFileMetadataTestCase { // hack for distributed cache FileUtils.deleteDirectory(new File("../job/meta")); - FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl()), new File("../job/meta")); + FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl().toString()), new File("../job/meta")); CuboidReducer reducer = new CuboidReducer(); reduceDriver = ReduceDriver.newReduceDriver(reducer); http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java index 815d9b3..c0ce1a4 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java @@ -54,7 +54,7 @@ public class NDCuboidMapperTest extends LocalFileMetadataTestCase { // hack for distributed cache FileUtils.deleteDirectory(new File("./meta")); - FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl()), new File("./meta")); + FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl().toString()), new File("./meta")); NDCuboidMapper mapper = new NDCuboidMapper(); CuboidReducer reducer = new CuboidReducer(); http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java index aa96e2e..f410f59 100644 --- a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java +++ b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java @@ -76,7 +76,7 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase { KylinConfig srcConfig = KylinConfig.getInstanceFromEnv(); localMetaDir = Files.createTempDir(); - backup = srcConfig.getMetadataUrl(); + backup = srcConfig.getMetadataUrl().toString(); srcConfig.setProperty("kylin.metadata.url", localMetaDir.getAbsolutePath()); srcConfig.writeProperties(new File(confDstPath1)); srcConfig.writeProperties(new File(confDstPath2)); @@ -198,6 +198,7 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase { } private String getLockPath(String pathName) { - return DistributedScheduler.ZOOKEEPER_LOCK_PATH + "/" + kylinConfig1.getMetadataUrlPrefix() + "/" + pathName; + return DistributedScheduler.dropDoubleSlash(// + DistributedScheduler.ZOOKEEPER_LOCK_PATH + "/" + kylinConfig1.getMetadataUrlPrefix() + "/" + pathName); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java index 733ca06..847baf8 100644 --- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java @@ -71,9 +71,8 @@ public class ITStorageTest extends HBaseMetadataTestCase { cube = cubeMgr.getCube("test_kylin_cube_without_slr_left_join_empty"); Assert.assertNotNull(cube); storageEngine = StorageFactory.createQuery(cube); - String url = KylinConfig.getInstanceFromEnv().getStorageUrl(); context = new StorageContext(); - context.setConnUrl(url); + context.setConnUrl(KylinConfig.getInstanceFromEnv().getStorageUrl()); mockup = new StorageMockUtils(cube.getModel()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/query/src/main/java/org/apache/kylin/query/schema/OLAPSchema.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/schema/OLAPSchema.java b/query/src/main/java/org/apache/kylin/query/schema/OLAPSchema.java index 52023e4..9124b15 100644 --- a/query/src/main/java/org/apache/kylin/query/schema/OLAPSchema.java +++ b/query/src/main/java/org/apache/kylin/query/schema/OLAPSchema.java @@ -25,6 +25,7 @@ import java.util.Set; import org.apache.calcite.schema.Table; import org.apache.calcite.schema.impl.AbstractSchema; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.StorageURL; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.TableDesc; @@ -40,7 +41,7 @@ public class OLAPSchema extends AbstractSchema { private KylinConfig config; private String projectName; private String schemaName; - private String storageUrl; + private StorageURL storageUrl; private String starSchemaUrl; private String starSchemaUser; private String starSchemaPassword; @@ -89,7 +90,7 @@ public class OLAPSchema extends AbstractSchema { return schemaName; } - public String getStorageUrl() { + public StorageURL getStorageUrl() { return storageUrl; } http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java b/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java index 72d44bb..ca49641 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java +++ b/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java @@ -18,14 +18,15 @@ package org.apache.kylin.rest.security; +import java.io.IOException; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.StorageURL; import org.apache.kylin.rest.service.LegacyAclService; -import org.apache.kylin.rest.service.QueryService; import org.apache.kylin.rest.service.LegacyUserService; - -import java.io.IOException; +import org.apache.kylin.rest.service.QueryService; /** */ @@ -40,8 +41,8 @@ public class MockAclHBaseStorage implements AclHBaseStorage { private RealAclHBaseStorage realAcl; public MockAclHBaseStorage() { - String metadataUrl = KylinConfig.getInstanceFromEnv().getMetadataUrl(); - if (metadataUrl != null && metadataUrl.endsWith("hbase")) { + StorageURL metadataUrl = KylinConfig.getInstanceFromEnv().getMetadataUrl(); + if (metadataUrl.getScheme().endsWith("hbase")) { // hbase must be available since metadata is on it // in this case, let us use a real ACL instead of mockup realAcl = new RealAclHBaseStorage(); http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/server-base/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java b/server-base/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java index f42b3f2..98cef3a 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java +++ b/server-base/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java @@ -18,33 +18,32 @@ package org.apache.kylin.rest.security; +import java.io.IOException; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.StorageURL; import org.apache.kylin.rest.service.LegacyAclService; -import org.apache.kylin.rest.service.QueryService; import org.apache.kylin.rest.service.LegacyUserService; +import org.apache.kylin.rest.service.QueryService; import org.apache.kylin.storage.hbase.HBaseConnection; -import java.io.IOException; - /** */ @Deprecated public class RealAclHBaseStorage implements AclHBaseStorage { - private String hbaseUrl; + private StorageURL hbaseUrl; private String aclTableName; private String userTableName; @Override public String prepareHBaseTable(Class<?> clazz) throws IOException { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - String metadataUrl = kylinConfig.getMetadataUrl(); - int cut = metadataUrl.indexOf('@'); - hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1); - String tableNameBase = kylinConfig.getMetadataUrlPrefix(); + hbaseUrl = kylinConfig.getMetadataUrl(); + String tableNameBase = hbaseUrl.getIdentifier(); if (clazz == LegacyAclService.class) { aclTableName = tableNameBase + ACL_TABLE_NAME; http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 1ddf4a3..9eab5a3 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; +import org.apache.kylin.common.StorageURL; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.exceptions.ResourceLimitExceededException; import org.apache.kylin.common.util.Bytes; @@ -115,7 +116,7 @@ public class QueryService extends BasicService { private final Serializer<Query[]> querySerializer = new Serializer<Query[]>(Query[].class); private final BadQueryDetector badQueryDetector = new BadQueryDetector(); - private final String hbaseUrl; + private final StorageURL hbaseUrl; private final String userTableName; @Autowired @@ -134,12 +135,8 @@ public class QueryService extends BasicService { public QueryService() { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - String metadataUrl = kylinConfig.getMetadataUrl(); - // split TABLE@HBASE_URL - int cut = metadataUrl.indexOf('@'); - hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1); - String tableNameBase = kylinConfig.getMetadataUrlPrefix(); - userTableName = tableNameBase + USER_TABLE_NAME; + hbaseUrl = kylinConfig.getMetadataUrl(); + userTableName = hbaseUrl.getIdentifier() + USER_TABLE_NAME; badQueryDetector.start(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java index f0b9428..2bbc2bc 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java @@ -18,7 +18,19 @@ package org.apache.kylin.storage.hbase; -import com.google.common.collect.Sets; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -33,22 +45,13 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.StorageURL; import org.apache.kylin.common.persistence.StorageException; import org.apache.kylin.common.util.HadoopUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.util.Collection; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import com.google.common.collect.Sets; /** * @author yangli9 @@ -60,8 +63,8 @@ public class HBaseConnection { private static final Logger logger = LoggerFactory.getLogger(HBaseConnection.class); - private static final Map<String, Configuration> configCache = new ConcurrentHashMap<String, Configuration>(); - private static final Map<String, Connection> connPool = new ConcurrentHashMap<String, Connection>(); + private static final Map<StorageURL, Configuration> configCache = new ConcurrentHashMap<StorageURL, Configuration>(); + private static final Map<StorageURL, Connection> connPool = new ConcurrentHashMap<StorageURL, Connection>(); private static final ThreadLocal<Configuration> configThreadLocal = new ThreadLocal<>(); private static ExecutorService coprocessorPool = null; @@ -76,7 +79,7 @@ public class HBaseConnection { try { conn.close(); } catch (IOException e) { - e.printStackTrace(); + logger.error("error closing hbase connection " + conn, e); } } } @@ -131,15 +134,15 @@ public class HBaseConnection { public static Configuration getCurrentHBaseConfiguration() { if (configThreadLocal.get() == null) { - String storageUrl = KylinConfig.getInstanceFromEnv().getStorageUrl(); + StorageURL storageUrl = KylinConfig.getInstanceFromEnv().getStorageUrl(); configThreadLocal.set(newHBaseConfiguration(storageUrl)); } return configThreadLocal.get(); } - private static Configuration newHBaseConfiguration(String url) { + private static Configuration newHBaseConfiguration(StorageURL url) { // using a hbase:xxx URL is deprecated, instead hbase config is always loaded from hbase-site.xml in classpath - if (!(StringUtils.isEmpty(url) || "hbase".equals(url))) + if (!"hbase".equals(url.getScheme())) throw new IllegalArgumentException("to use hbase storage, pls set 'kylin.storage.url=hbase' in kylin.properties"); Configuration conf = HBaseConfiguration.create(HadoopUtil.getCurrentConfiguration()); @@ -159,6 +162,10 @@ public class HBaseConnection { if (StringUtils.isBlank(conf.get("hbase.fs.tmp.dir"))) { conf.set("hbase.fs.tmp.dir", "/tmp"); } + + for (Entry<String, String> entry : url.getAllParameters().entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } return conf; } @@ -210,7 +217,7 @@ public class HBaseConnection { // returned Connection can be shared by multiple threads and does not require close() @SuppressWarnings("resource") - public static Connection get(String url) { + public static Connection get(StorageURL url) { // find configuration Configuration conf = configCache.get(url); if (conf == null) { @@ -252,15 +259,15 @@ public class HBaseConnection { } } - public static boolean tableExists(String hbaseUrl, String tableName) throws IOException { + public static boolean tableExists(StorageURL hbaseUrl, String tableName) throws IOException { return tableExists(HBaseConnection.get(hbaseUrl), tableName); } - public static void createHTableIfNeeded(String hbaseUrl, String tableName, String... families) throws IOException { + public static void createHTableIfNeeded(StorageURL hbaseUrl, String tableName, String... families) throws IOException { createHTableIfNeeded(HBaseConnection.get(hbaseUrl), tableName, families); } - public static void deleteTable(String hbaseUrl, String tableName) throws IOException { + public static void deleteTable(StorageURL hbaseUrl, String tableName) throws IOException { deleteTable(HBaseConnection.get(hbaseUrl), tableName); } http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java index 76aefe0..a2e0229 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java @@ -22,7 +22,9 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.NavigableSet; import java.util.TreeSet; import java.util.UUID; @@ -31,13 +33,12 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -49,6 +50,7 @@ import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.KeyOnlyFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.StorageURL; import org.apache.kylin.common.persistence.RawResource; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.StringEntity; @@ -71,25 +73,34 @@ public class HBaseResourceStore extends ResourceStore { private static final String COLUMN_TS = "t"; private static final byte[] B_COLUMN_TS = Bytes.toBytes(COLUMN_TS); - final String tableNameBase; - final String hbaseUrl; + final String tableName; + final StorageURL metadataUrl; Connection getConnection() throws IOException { - return HBaseConnection.get(hbaseUrl); + return HBaseConnection.get(metadataUrl); } public HBaseResourceStore(KylinConfig kylinConfig) throws IOException { super(kylinConfig); - String metadataUrl = kylinConfig.getMetadataUrl(); - // split TABLE@HBASE_URL - int cut = metadataUrl.indexOf('@'); - tableNameBase = cut < 0 ? DEFAULT_STORE_NAME : metadataUrl.substring(0, cut); - hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1); - if (!hbaseUrl.equals("hbase")) - throw new IOException("Can not create HBaseResourceStore. Url not match. Url:" + hbaseUrl); + metadataUrl = buildMetadataUrl(kylinConfig); + tableName = metadataUrl.getIdentifier(); createHTableIfNeeded(getAllInOneTableName()); + } + + private StorageURL buildMetadataUrl(KylinConfig kylinConfig) throws IOException { + StorageURL url = kylinConfig.getMetadataUrl(); + if (!url.getScheme().equals("hbase")) + throw new IOException("Cannot create HBaseResourceStore. Url not match. Url: " + url); + // control timeout for prompt error report + Map<String, String> newParams = new LinkedHashMap<>(); + newParams.put("hbase.client.scanner.timeout.period", "10000"); + newParams.put("hbase.rpc.timeout", "5000"); + newParams.put("hbase.client.retries.number", "1"); + newParams.putAll(url.getAllParameters()); + + return url.copy(newParams); } private void createHTableIfNeeded(String tableName) throws IOException { @@ -97,7 +108,7 @@ public class HBaseResourceStore extends ResourceStore { } private String getAllInOneTableName() { - return tableNameBase; + return tableName; } @Override @@ -120,14 +131,11 @@ public class HBaseResourceStore extends ResourceStore { return result.isEmpty() ? null : result; } - /* - override get meta store uuid method for backward compatibility - */ - + /* override get meta store uuid method for backward compatibility */ @Override public String createMetaStoreUUID() throws IOException { - try (final HBaseAdmin hbaseAdmin = new HBaseAdmin(HBaseConfiguration.create(HadoopUtil.getCurrentConfiguration()))) { - final String metaStoreName = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix(); + try (final Admin hbaseAdmin = HBaseConnection.get(metadataUrl).getAdmin()) { + final String metaStoreName = metadataUrl.getIdentifier(); final HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(metaStoreName)); String uuid = desc.getValue(HBaseConnection.HTABLE_UUID_TAG); if (uuid != null) http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java index 581de38..2154ed1 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.StorageURL; import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.RawResource; import org.apache.kylin.common.persistence.ResourceStore; @@ -172,26 +173,15 @@ public class CubeMigrationCLI { checkCLI.execute(cubeName); } - private static String checkAndGetHbaseUrl() { - String srcMetadataUrl = srcConfig.getMetadataUrl(); - String dstMetadataUrl = dstConfig.getMetadataUrl(); + private static void checkAndGetHbaseUrl() { + StorageURL srcMetadataUrl = srcConfig.getMetadataUrl(); + StorageURL dstMetadataUrl = dstConfig.getMetadataUrl(); logger.info("src metadata url is " + srcMetadataUrl); logger.info("dst metadata url is " + dstMetadataUrl); - int srcIndex = srcMetadataUrl.toLowerCase().indexOf("hbase"); - int dstIndex = dstMetadataUrl.toLowerCase().indexOf("hbase"); - if (srcIndex < 0 || dstIndex < 0) + if (!"hbase".equals(srcMetadataUrl.getScheme()) || !"hbase".equals(dstMetadataUrl.getScheme())) throw new IllegalStateException("Both metadata urls should be hbase metadata url"); - - String srcHbaseUrl = srcMetadataUrl.substring(srcIndex).trim(); - String dstHbaseUrl = dstMetadataUrl.substring(dstIndex).trim(); - if (!srcHbaseUrl.equalsIgnoreCase(dstHbaseUrl)) { - throw new IllegalStateException("hbase url not equal! "); - } - - logger.info("hbase url is " + srcHbaseUrl.trim()); - return srcHbaseUrl.trim(); } private static void renameFoldersInHdfs(CubeInstance cube) { http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java index dd5f8fa..a317110 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.KeyOnlyFilter; +import org.apache.kylin.common.StorageURL; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.Pair; import org.apache.kylin.storage.hbase.HBaseConnection; @@ -73,7 +74,7 @@ public class GridTableHBaseBenchmark { public static void testGridTable(double hitRatio, double indexRatio) throws IOException { System.out.println("Testing grid table scanning, hit ratio " + hitRatio + ", index ratio " + indexRatio); - String hbaseUrl = "hbase"; // use hbase-site.xml on classpath + StorageURL hbaseUrl = StorageURL.valueOf("default@hbase"); // use hbase-site.xml on classpath Connection conn = HBaseConnection.get(hbaseUrl); createHTableIfNeeded(conn, TEST_TABLE); http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java index b025cf7..fe1ad4e 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java @@ -18,7 +18,14 @@ package org.apache.kylin.storage.hdfs; -import com.google.common.collect.Lists; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.List; +import java.util.NavigableSet; +import java.util.TreeSet; + import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -26,19 +33,14 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.StorageURL; import org.apache.kylin.common.persistence.RawResource; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.HadoopUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.Collections; -import java.util.List; -import java.util.NavigableSet; -import java.util.TreeSet; +import com.google.common.collect.Lists; public class HDFSResourceStore extends ResourceStore { @@ -50,16 +52,12 @@ public class HDFSResourceStore extends ResourceStore { public HDFSResourceStore(KylinConfig kylinConfig) throws Exception { super(kylinConfig); - String metadataUrl = kylinConfig.getMetadataUrl(); - int cut = metadataUrl.indexOf('@'); - if (cut < 0) { - throw new IOException("kylin.metadata.url not recognized for HDFSResourceStore: " + metadataUrl); - } - String suffix = metadataUrl.substring(cut + 1); - if (!suffix.equals("hdfs")) - throw new IOException("kylin.metadata.url not recognized for HDFSResourceStore:" + suffix); + StorageURL metadataUrl = kylinConfig.getMetadataUrl(); + + if (!metadataUrl.getScheme().equals("hdfs")) + throw new IOException("kylin.metadata.url not recognized for HDFSResourceStore:" + metadataUrl); - String path = metadataUrl.substring(0, cut); + String path = metadataUrl.getIdentifier(); fs = HadoopUtil.getFileSystem(path); Path metadataPath = new Path(path); if (fs.exists(metadataPath) == false) { @@ -220,22 +218,4 @@ public class HDFSResourceStore extends ResourceStore { resourcePath = resourcePath.substring(1, resourcePath.length()); return new Path(this.hdfsMetaPath, resourcePath); } - - private static String getRelativePath(Path hdfsPath) { - String path = hdfsPath.toString(); - int index = path.indexOf("://"); - if (index > 0) { - path = path.substring(index + 3); - } - - if (path.startsWith("/") == false) { - if (path.indexOf("/") > 0) { - path = path.substring(path.indexOf("/")); - } else { - path = "/" + path; - } - } - return path; - } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java index f36a62c..d47f393 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java @@ -55,7 +55,7 @@ public class CubeHFileMapper2Test extends LocalFileMetadataTestCase { this.createTestMetadata(); // hack for distributed cache FileUtils.deleteDirectory(new File("../job/meta")); - FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl()), new File("../job/meta")); + FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl().toString()), new File("../job/meta")); CubeDesc desc = CubeManager.getInstance(getTestConfig()).getCube(cubeName).getDescriptor(); codec = new MeasureCodec(desc.getMeasures()); } @@ -70,7 +70,7 @@ public class CubeHFileMapper2Test extends LocalFileMetadataTestCase { public void testBasic() throws Exception { Configuration hconf = HadoopUtil.getCurrentConfiguration(); - Context context = MockupMapContext.create(hconf, getTestConfig().getMetadataUrl(), cubeName, outKV); + Context context = MockupMapContext.create(hconf, cubeName, outKV); CubeHFileMapper mapper = new CubeHFileMapper(); mapper.setup(context); http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/MockupMapContext.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/MockupMapContext.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/MockupMapContext.java index 536634d..61170e8 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/MockupMapContext.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/MockupMapContext.java @@ -53,7 +53,7 @@ public class MockupMapContext implements MapContext { private Object[] outKV; - public static Context create(final Configuration hconf, String metadataUrl, String cubeName, final Object[] outKV) { + public static Context create(final Configuration hconf, String cubeName, final Object[] outKV) { hconf.set(BatchConstants.CFG_CUBE_NAME, cubeName); http://git-wip-us.apache.org/repos/asf/kylin/blob/2dfc5bb3/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java index c162a76..08d4292 100644 --- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java +++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.StorageURL; import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.RawResource; import org.apache.kylin.common.persistence.ResourceStore; @@ -172,26 +173,15 @@ public class CubeMigrationCLI { checkCLI.execute(cubeName); } - private String checkAndGetHbaseUrl() { - String srcMetadataUrl = srcConfig.getMetadataUrl(); - String dstMetadataUrl = dstConfig.getMetadataUrl(); + private void checkAndGetHbaseUrl() { + StorageURL srcMetadataUrl = srcConfig.getMetadataUrl(); + StorageURL dstMetadataUrl = dstConfig.getMetadataUrl(); logger.info("src metadata url is " + srcMetadataUrl); logger.info("dst metadata url is " + dstMetadataUrl); - int srcIndex = srcMetadataUrl.toLowerCase().indexOf("hbase"); - int dstIndex = dstMetadataUrl.toLowerCase().indexOf("hbase"); - if (srcIndex < 0 || dstIndex < 0) + if (!"hbase".equals(srcMetadataUrl.getScheme()) || !"hbase".equals(dstMetadataUrl.getScheme())) throw new IllegalStateException("Both metadata urls should be hbase metadata url"); - - String srcHbaseUrl = srcMetadataUrl.substring(srcIndex).trim(); - String dstHbaseUrl = dstMetadataUrl.substring(dstIndex).trim(); - if (!srcHbaseUrl.equalsIgnoreCase(dstHbaseUrl)) { - throw new IllegalStateException("hbase url not equal! "); - } - - logger.info("hbase url is " + srcHbaseUrl.trim()); - return srcHbaseUrl.trim(); } protected void renameFoldersInHdfs(CubeInstance cube) throws IOException {