http://git-wip-us.apache.org/repos/asf/kylin/blob/afaa95a0/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 61ddbb0..5130e55 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -20,6 +20,8 @@ package org.apache.kylin.rest.service; import static org.apache.kylin.common.util.CheckUtil.checkCondition; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.sql.Connection; import java.sql.DatabaseMetaData; @@ -31,7 +33,6 @@ import java.sql.Statement; import java.sql.Time; import java.sql.Timestamp; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -46,21 +47,17 @@ import javax.annotation.PostConstruct; import javax.sql.DataSource; import org.apache.calcite.avatica.ColumnMetaData.Rep; -import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; -import org.apache.kylin.common.StorageURL; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.exceptions.ResourceLimitExceededException; -import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.persistence.RootPersistentEntity; +import org.apache.kylin.common.persistence.Serializer; import org.apache.kylin.common.util.DBUtils; +import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.SetThreadName; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -92,9 +89,7 @@ import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.response.SQLResponse; import org.apache.kylin.rest.util.AclUtil; import org.apache.kylin.rest.util.AdHocUtil; -import org.apache.kylin.rest.util.Serializer; import org.apache.kylin.rest.util.TableauInterceptor; -import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.kylin.storage.hybrid.HybridInstance; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,6 +100,7 @@ import org.springframework.security.core.GrantedAuthority; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.stereotype.Component; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.CharMatcher; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; @@ -123,18 +119,12 @@ public class QueryService extends BasicService { private static final Logger logger = LoggerFactory.getLogger(QueryService.class); - public static final String USER_QUERY_FAMILY = "q"; - private static final String USER_TABLE_NAME = "_user"; - private static final String USER_QUERY_COLUMN = "c"; - public static final String SUCCESS_QUERY_CACHE = "StorageCache"; public static final String EXCEPTION_QUERY_CACHE = "ExceptionQueryCache"; + public static final String QUERY_STORE_PATH_PREFIX = "/query/"; - private final Serializer<Query[]> querySerializer = new Serializer<Query[]>(Query[].class); - protected final BadQueryDetector badQueryDetector = new BadQueryDetector(); - - private final StorageURL hbaseUrl; - private final String userTableName; + final BadQueryDetector badQueryDetector = new BadQueryDetector(); + final ResourceStore queryStore; @Autowired protected CacheManager cacheManager; @@ -156,10 +146,7 @@ public class QueryService extends BasicService { } public QueryService() { - KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - hbaseUrl = kylinConfig.getMetadataUrl(); - userTableName = hbaseUrl.getIdentifier() + USER_TABLE_NAME; - + queryStore = ResourceStore.getStore(getConfig()); badQueryDetector.start(); } @@ -183,18 +170,10 @@ public class QueryService extends BasicService { List<Query> queries = getQueries(creator); queries.add(query); Query[] queryArray = new Query[queries.size()]; - - byte[] bytes = querySerializer.serialize(queries.toArray(queryArray)); - Table htable = null; - try { - htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); - Put put = new Put(Bytes.toBytes(creator)); - put.addColumn(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes); - - htable.put(put); - } finally { - IOUtils.closeQuietly(htable); - } + QueryRecord record = new QueryRecord(queries.toArray(queryArray)); + queryStore.deleteResource(getQueryKeyById(creator)); + queryStore.putResource(getQueryKeyById(creator), record, 0, QueryRecordSerializer.getInstance()); + return; } public void removeQuery(final String creator, final String id) throws IOException { @@ -214,45 +193,24 @@ public class QueryService extends BasicService { if (!changed) { return; } - Query[] queryArray = new Query[queries.size()]; - byte[] bytes = querySerializer.serialize(queries.toArray(queryArray)); - Table htable = null; - try { - htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); - Put put = new Put(Bytes.toBytes(creator)); - put.addColumn(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes); - - htable.put(put); - } finally { - IOUtils.closeQuietly(htable); - } + QueryRecord record = new QueryRecord(queries.toArray(queryArray)); + queryStore.deleteResource(getQueryKeyById(creator)); + queryStore.putResource(getQueryKeyById(creator), record, 0, QueryRecordSerializer.getInstance()); + return; } public List<Query> getQueries(final String creator) throws IOException { if (null == creator) { return null; } - List<Query> queries = new ArrayList<Query>(); - Table htable = null; - try { - org.apache.hadoop.hbase.client.Connection conn = HBaseConnection.get(hbaseUrl); - HBaseConnection.createHTableIfNeeded(conn, userTableName, USER_QUERY_FAMILY); - - htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); - Get get = new Get(Bytes.toBytes(creator)); - get.addFamily(Bytes.toBytes(USER_QUERY_FAMILY)); - Result result = htable.get(get); - Query[] query = querySerializer.deserialize(result.getValue(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN))); - - if (null != query) { - queries.addAll(Arrays.asList(query)); + QueryRecord record = queryStore.getResource(getQueryKeyById(creator), QueryRecord.class, QueryRecordSerializer.getInstance()); + if (record != null) { + for (Query query : record.getQueries()) { + queries.add(query); } - } finally { - IOUtils.closeQuietly(htable); } - return queries; } @@ -892,4 +850,58 @@ public class QueryService extends BasicService { public void setCacheManager(CacheManager cacheManager) { this.cacheManager = cacheManager; } + + private static String getQueryKeyById(String creator) { + return QUERY_STORE_PATH_PREFIX + creator; + } + + private static class QueryRecordSerializer implements Serializer<QueryRecord> { + + private static final QueryRecordSerializer serializer = new QueryRecordSerializer(); + + QueryRecordSerializer() { + + } + + public static QueryRecordSerializer getInstance() { + return serializer; + } + + @Override + public void serialize(QueryRecord record, DataOutputStream out) throws IOException { + String jsonStr = JsonUtil.writeValueAsString(record); + out.writeUTF(jsonStr); + } + + @Override + public QueryRecord deserialize(DataInputStream in) throws IOException { + String jsonStr = in.readUTF(); + return JsonUtil.readValue(jsonStr, QueryRecord.class); + } + } + +} + +@SuppressWarnings("serial") +class QueryRecord extends RootPersistentEntity { + + @JsonProperty() + private Query[] queries; + + public QueryRecord() { + + } + + public QueryRecord(Query[] queries) { + this.queries = queries; + } + + public Query[] getQueries() { + return queries; + } + + public void setQueries(Query[] queries) { + this.queries = queries; + } + }
http://git-wip-us.apache.org/repos/asf/kylin/blob/afaa95a0/server/src/main/resources/applicationContext.xml ---------------------------------------------------------------------- diff --git a/server/src/main/resources/applicationContext.xml b/server/src/main/resources/applicationContext.xml index 100b202..8416f25 100644 --- a/server/src/main/resources/applicationContext.xml +++ b/server/src/main/resources/applicationContext.xml @@ -112,11 +112,4 @@ p:configLocation="classpath:ehcache-test.xml" p:shared="true"/> </beans> - <!-- hbase storage/global lock Config --> - <beans profile="ldap,saml"> - <bean id="aclHBaseStorage" class="org.apache.kylin.rest.security.RealAclHBaseStorage"/> - </beans> - <beans profile="testing"> - <bean id="aclHBaseStorage" class="org.apache.kylin.rest.security.MockAclHBaseStorage"/> - </beans> </beans> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/afaa95a0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java index 81349ef..615c845 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java @@ -67,10 +67,15 @@ public class HBaseResourceStore extends ResourceStore { private static final Logger logger = LoggerFactory.getLogger(HBaseResourceStore.class); private static final String FAMILY = "f"; + private static final byte[] B_FAMILY = Bytes.toBytes(FAMILY); + private static final String COLUMN = "c"; + private static final byte[] B_COLUMN = Bytes.toBytes(COLUMN); + private static final String COLUMN_TS = "t"; + private static final byte[] B_COLUMN_TS = Bytes.toBytes(COLUMN_TS); final String tableName; @@ -82,10 +87,9 @@ public class HBaseResourceStore extends ResourceStore { public HBaseResourceStore(KylinConfig kylinConfig) throws IOException { super(kylinConfig); - metadataUrl = buildMetadataUrl(kylinConfig); tableName = metadataUrl.getIdentifier(); - createHTableIfNeeded(getAllInOneTableName()); + createHTableIfNeeded(tableName); } private StorageURL buildMetadataUrl(KylinConfig kylinConfig) throws IOException { @@ -107,10 +111,6 @@ public class HBaseResourceStore extends ResourceStore { HBaseConnection.createHTableIfNeeded(getConnection(), tableName, FAMILY); } - private String getAllInOneTableName() { - return tableName; - } - @Override protected boolean existsImpl(String resPath) throws IOException { Result r = getFromHTable(resPath, false, false); @@ -164,7 +164,7 @@ public class HBaseResourceStore extends ResourceStore { byte[] endRow = Bytes.toBytes(lookForPrefix); endRow[endRow.length - 1]++; - Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); + Table table = getConnection().getTable(TableName.valueOf(tableName)); Scan scan = new Scan(startRow, endRow); if ((filter != null && filter instanceof KeyOnlyFilter) == false) { scan.addColumn(B_FAMILY, B_COLUMN_TS); @@ -288,7 +288,7 @@ public class HBaseResourceStore extends ResourceStore { IOUtils.copy(content, bout); bout.close(); - Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); + Table table = getConnection().getTable(TableName.valueOf(tableName)); try { byte[] row = Bytes.toBytes(resPath); Put put = buildPut(resPath, ts, row, bout.toByteArray(), table); @@ -302,7 +302,7 @@ public class HBaseResourceStore extends ResourceStore { @Override protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException { - Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); + Table table = getConnection().getTable(TableName.valueOf(tableName)); try { byte[] row = Bytes.toBytes(resPath); byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS); @@ -325,7 +325,7 @@ public class HBaseResourceStore extends ResourceStore { @Override protected void deleteResourceImpl(String resPath) throws IOException { - Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); + Table table = getConnection().getTable(TableName.valueOf(tableName)); try { boolean hdfsResourceExist = false; Result result = internalGetFromHTable(table, resPath, true, false); @@ -354,11 +354,11 @@ public class HBaseResourceStore extends ResourceStore { @Override protected String getReadableResourcePathImpl(String resPath) { - return getAllInOneTableName() + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl(); + return tableName + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl(); } private Result getFromHTable(String path, boolean fetchContent, boolean fetchTimestamp) throws IOException { - Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); + Table table = getConnection().getTable(TableName.valueOf(tableName)); try { return internalGetFromHTable(table, path, fetchContent, fetchTimestamp); } finally { @@ -429,6 +429,6 @@ public class HBaseResourceStore extends ResourceStore { @Override public String toString() { - return getAllInOneTableName() + "@hbase"; + return tableName + "@hbase"; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/afaa95a0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java index 991a750..6e7890b 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java @@ -30,7 +30,7 @@ import org.apache.kylin.job.lock.JobLock; public class ZookeeperJobLock implements DistributedLock, JobLock { private ZookeeperDistributedLock lock = (ZookeeperDistributedLock) new ZookeeperDistributedLock.Factory().lockForCurrentProcess(); - + @Override public String getClient() { return lock.getClient(); @@ -60,7 +60,7 @@ public class ZookeeperJobLock implements DistributedLock, JobLock { public boolean isLockedByMe(String lockPath) { return lock.isLockedByMe(lockPath); } - + @Override public void unlock(String lockPath) { lock.unlock(lockPath); @@ -70,6 +70,7 @@ public class ZookeeperJobLock implements DistributedLock, JobLock { public void purgeLocks(String lockPathRoot) { lock.purgeLocks(lockPathRoot); } + @Override public Closeable watchLocks(String lockPathRoot, Executor executor, Watcher watcher) { return lock.watchLocks(lockPathRoot, executor, watcher); http://git-wip-us.apache.org/repos/asf/kylin/blob/afaa95a0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java index b5ebe89..20569d3 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java @@ -18,35 +18,17 @@ package org.apache.kylin.storage.hbase.util; -import java.util.Arrays; - -import javax.annotation.Nullable; - -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.kylin.storage.hbase.HBaseConnection; - -import com.google.common.base.Function; -import com.google.common.collect.Iterables; +import org.apache.kylin.common.KylinConfig; public class ZookeeperUtil { + public static String ZOOKEEPER_UTIL_HBASE_CLASSNAME = "org.apache.kylin.storage.hbase.util.ZooKeeperUtilHbase"; + /** - * Get zookeeper connection string from HBase Configuration - * - * @return Zookeeper Connection string + * Get zookeeper connection string from HBase Configuration or from kylin.properties */ public static String getZKConnectString() { - Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); - final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM); - final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT); - return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() { - @Nullable - @Override - public String apply(String input) { - return input + ":" + port; - } - }), ","); + KylinConfig config = KylinConfig.getInstanceFromEnv(); + return config.getZookeeperConnectString(); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/afaa95a0/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java index fe1ad4e..d185f4e 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java @@ -40,6 +40,7 @@ import org.apache.kylin.common.util.HadoopUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; public class HDFSResourceStore extends ResourceStore { @@ -50,13 +51,13 @@ public class HDFSResourceStore extends ResourceStore { private FileSystem fs; + private static final String HDFS_SCHEME = "hdfs"; + public HDFSResourceStore(KylinConfig kylinConfig) throws Exception { super(kylinConfig); StorageURL metadataUrl = kylinConfig.getMetadataUrl(); + Preconditions.checkState(HDFS_SCHEME.equals(metadataUrl.getScheme())); - if (!metadataUrl.getScheme().equals("hdfs")) - throw new IOException("kylin.metadata.url not recognized for HDFSResourceStore:" + metadataUrl); - String path = metadataUrl.getIdentifier(); fs = HadoopUtil.getFileSystem(path); Path metadataPath = new Path(path); http://git-wip-us.apache.org/repos/asf/kylin/blob/afaa95a0/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java b/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java index e1f994f..0fdc740 100644 --- a/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java +++ b/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java @@ -20,6 +20,7 @@ package org.apache.kylin.tool; import java.io.File; import java.io.IOException; +import java.lang.reflect.Method; import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutorService; @@ -31,6 +32,7 @@ import org.apache.commons.cli.OptionBuilder; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.OptionsHelper; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.ProjectManager; @@ -159,11 +161,13 @@ public class DiagnosisInfoCLI extends AbstractInfoExtractor { public void run() { logger.info("Start to extract HBase usage."); try { + // use reflection to isolate NoClassDef errors when HBase is not available String[] hbaseArgs = { "-destDir", new File(exportDir, "hbase").getAbsolutePath(), "-project", projectNames, "-compress", "false", "-submodule", "true" }; - HBaseUsageExtractor hBaseUsageExtractor = new HBaseUsageExtractor(); logger.info("HBaseUsageExtractor args: " + Arrays.toString(hbaseArgs)); - hBaseUsageExtractor.execute(hbaseArgs); - } catch (Exception e) { + Object extractor = ClassUtil.newInstance("org.apache.kylin.tool.HBaseUsageExtractor"); + Method execute = extractor.getClass().getDeclaredMethod("execute", String[].class); + execute.invoke(extractor, (Object) hbaseArgs); + } catch (Throwable e) { logger.error("Error in export HBase usage.", e); } }