Repository: kylin Updated Branches: refs/heads/1.5.x-HBase1.x 90a0c5bab -> f6ee0e7c8 (forced update)
KYLIN-1908 Collect Metrics to JMX Signed-off-by: Yang Li <liy...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0188a264 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0188a264 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0188a264 Branch: refs/heads/1.5.x-HBase1.x Commit: 0188a264cbc3637638055cea3fd5a898cb9b938d Parents: e80ddde Author: kangkaisen <kangkai...@live.com> Authored: Sun Jul 24 16:47:55 2016 +0800 Committer: Yang Li <liy...@apache.org> Committed: Sun Aug 21 19:11:21 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 14 ++ .../test_case_data/sandbox/kylin.properties | 7 +- .../kylin/rest/controller/QueryController.java | 8 ++ .../kylin/rest/init/InitialTaskManager.java | 4 + .../apache/kylin/rest/metrics/QueryMetrics.java | 130 +++++++++++++++++++ .../kylin/rest/util/QueryMetricsUtil.java | 94 ++++++++++++++ .../kylin/rest/metrics/QueryMetricsTest.java | 117 +++++++++++++++++ 7 files changed, 372 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/0188a264/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 9a8e6fd..50205fb 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -106,6 +106,15 @@ abstract public class KylinConfigBase implements Serializable { } } + final protected int[] getOptionalIntArray(String prop, String[] dft) { + String[] strArray = getOptionalStringArray(prop, dft); + int[] intArray = new int[strArray.length]; + for (int i = 0; i < strArray.length; i++) { + intArray[i] = Integer.parseInt(strArray[i]); + } + return intArray; + } + final public String getRequired(String prop) { String r = getOptional(prop); if (StringUtils.isEmpty(r)) { @@ -561,6 +570,11 @@ abstract public class KylinConfigBase implements Serializable { return Long.valueOf(this.getOptional("kylin.query.sequence.expire.time", "86400000"));//default a day } + public int[] getQueryMetricsPercentilesIntervals() { + String[] dft = { "60", "300", "3600" }; + return getOptionalIntArray("kylin.query.metrics.percentiles.intervals", dft); + } + public int getHBaseKeyValueSize() { return Integer.parseInt(this.getOptional("kylin.hbase.client.keyvalue.maxsize", "10485760")); } http://git-wip-us.apache.org/repos/asf/kylin/blob/0188a264/examples/test_case_data/sandbox/kylin.properties ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties index 996b2b8..a6f89df 100644 --- a/examples/test_case_data/sandbox/kylin.properties +++ b/examples/test_case_data/sandbox/kylin.properties @@ -60,7 +60,7 @@ kylin.job.retry=0 # If true, job engine will not assume that hadoop CLI reside on the same server as it self # you will have to specify kylin.job.remote.cli.hostname, kylin.job.remote.cli.username and kylin.job.remote.cli.password -# It should not be set to "true" unless you're NOT running Kylin.sh on a hadoop client machine +# It should not be set to "true" unless you're NOT running Kylin.sh on a hadoop client machine # (Thus kylin instance has to ssh to another real hadoop client machine to execute hbase,hive,hadoop commands) kylin.job.run.as.remote.cmd=false @@ -147,5 +147,8 @@ kylin.web.contact_mail= ### OTHER ### +# kylin query metrics percentiles intervals default=60, 300, 3600 +kylin.query.metrics.percentiles.intervals=60, 360, 3600 + # Env DEV|QA|PROD -deploy.env=DEV +deploy.env=DEV \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/0188a264/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java index f61a90e..6f8a7e1 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; import javax.annotation.PostConstruct; import javax.servlet.http.HttpServletResponse; @@ -32,6 +33,7 @@ import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.exception.InternalErrorException; +import org.apache.kylin.rest.metrics.QueryMetrics; import org.apache.kylin.rest.model.Query; import org.apache.kylin.rest.model.SelectedColumnMeta; import org.apache.kylin.rest.model.TableMeta; @@ -41,6 +43,7 @@ import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.request.SaveSqlRequest; import org.apache.kylin.rest.response.SQLResponse; import org.apache.kylin.rest.service.QueryService; +import org.apache.kylin.rest.util.QueryMetricsUtil; import org.apache.kylin.rest.util.QueryUtil; import org.apache.kylin.storage.exception.ScanOutOfLimitException; import org.slf4j.Logger; @@ -74,6 +77,9 @@ public class QueryController extends BasicController { private static final Logger logger = LoggerFactory.getLogger(QueryController.class); + private ConcurrentHashMap<String, QueryMetrics> metricsMap = + new ConcurrentHashMap<String, QueryMetrics>(); + public static final String SUCCESS_QUERY_CACHE = "StorageCache"; public static final String EXCEPTION_QUERY_CACHE = "ExceptionQueryCache"; @@ -219,6 +225,8 @@ public class QueryController extends BasicController { queryService.logQuery(sqlRequest, sqlResponse); + QueryMetricsUtil.updateMetrics(sqlRequest, sqlResponse, metricsMap); + if (sqlResponse.getIsException()) throw new InternalErrorException(sqlResponse.getExceptionMessage()); http://git-wip-us.apache.org/repos/asf/kylin/blob/0188a264/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java b/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java index 8912968..430d8a4 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java +++ b/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java @@ -19,6 +19,7 @@ package org.apache.kylin.rest.init; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.kylin.common.KylinConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +37,9 @@ public class InitialTaskManager implements InitializingBean { logger.info("Kylin service is starting....."); runInitialTasks(); + + //init mrtrics system for kylin + DefaultMetricsSystem.initialize("Kylin"); } private void runInitialTasks() { http://git-wip-us.apache.org/repos/asf/kylin/blob/0188a264/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetrics.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetrics.java b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetrics.java new file mode 100644 index 0000000..eb1bed6 --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetrics.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.metrics; + +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableQuantiles; +import org.apache.hadoop.metrics2.lib.MutableRate; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; + +import javax.annotation.concurrent.ThreadSafe; + +/** + * properties and methods about query. + */ + +@ThreadSafe +@Metrics(name = "Query", about = "Query metrics", context = "Kylin") +public class QueryMetrics { + + final MetricsRegistry registry = new MetricsRegistry("Query"); + + @Metric + MutableCounterLong querySuccessCount; + @Metric + MutableCounterLong queryFailCount; + @Metric + MutableCounterLong queryCount; + @Metric + MutableCounterLong cacheHitCount; + MutableQuantiles[] cacheHitCountQuantiles; + + @Metric + MutableRate queryLatency; + MutableQuantiles[] queryLatencyTimeMillisQuantiles; + + @Metric + MutableRate scanRowCount; + MutableQuantiles[] scanRowCountQuantiles; + + @Metric + MutableRate resultRowCount; + MutableQuantiles[] resultRowCountQuantiles; + + public QueryMetrics(int[] intervals) { + queryLatencyTimeMillisQuantiles = new MutableQuantiles[intervals.length]; + scanRowCountQuantiles = new MutableQuantiles[intervals.length]; + resultRowCountQuantiles = new MutableQuantiles[intervals.length]; + cacheHitCountQuantiles = new MutableQuantiles[intervals.length]; + + for (int i = 0; i < intervals.length; i++) { + int interval = intervals[i]; + + queryLatencyTimeMillisQuantiles[i] = registry.newQuantiles("QueryLatency" + interval + "s", "Query queue time in milli second", "ops", "", interval); + scanRowCountQuantiles[i] = registry.newQuantiles("ScanRowCount" + interval + "s", "Scan row count in milli second", "ops", "", interval); + resultRowCountQuantiles[i] = registry.newQuantiles("ResultRowCount" + interval + "s", "Result row count in milli second", "ops", "", interval); + cacheHitCountQuantiles[i] = registry.newQuantiles("CacheHitCount" + interval + "s", "Cache Hit Count in milli second", "ops", "", interval); + } + + queryLatency = registry.newRate("QueryLatency", "", true); + scanRowCount = registry.newRate("ScanRowCount", "", true); + resultRowCount = registry.newRate("ResultRowCount", "", true); + } + + public void shutdown() { + DefaultMetricsSystem.shutdown(); + } + + public void incrQuerySuccessCount() { + querySuccessCount.incr(); + } + + public void incrQueryFailCount() { + queryFailCount.incr(); + } + + public void incrQueryCount() { + queryCount.incr(); + } + + public void addQueryLatency(long latency) { + queryLatency.add(latency); + for (MutableQuantiles m : queryLatencyTimeMillisQuantiles) { + m.add(latency); + } + } + + public void addScanRowCount(long count) { + scanRowCount.add(count); + for (MutableQuantiles m : scanRowCountQuantiles) { + m.add(count); + } + } + + public void addResultRowCount(long count) { + resultRowCount.add(count); + for (MutableQuantiles m : resultRowCountQuantiles) { + m.add(count); + } + } + + public void addCacheHitCount(long count) { + cacheHitCount.incr(count); + for (MutableQuantiles m : cacheHitCountQuantiles) { + m.add(count); + } + } + + public QueryMetrics registerWith(String name) { + return DefaultMetricsSystem.instance().register(name, "Query", this); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/0188a264/server-base/src/main/java/org/apache/kylin/rest/util/QueryMetricsUtil.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/util/QueryMetricsUtil.java b/server-base/src/main/java/org/apache/kylin/rest/util/QueryMetricsUtil.java new file mode 100644 index 0000000..0c1c780 --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/util/QueryMetricsUtil.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.util; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.rest.metrics.QueryMetrics; +import org.apache.kylin.rest.request.SQLRequest; +import org.apache.kylin.rest.response.SQLResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.ThreadSafe; +import java.util.concurrent.ConcurrentHashMap; + +/** + * update metrics about query + */ +@ThreadSafe +public class QueryMetricsUtil { + + private static final Logger logger = LoggerFactory.getLogger(QueryMetricsUtil.class); + + public static void updateMetrics(SQLRequest sqlRequest, SQLResponse sqlResponse, ConcurrentHashMap<String, QueryMetrics> metricsMap) { + String projectName = sqlRequest.getProject(); + String cubeName = sqlResponse.getCube(); + + update(getQueryMetrics("Server_Total", metricsMap), sqlResponse); + + update(getQueryMetrics(projectName, metricsMap), sqlResponse); + + String cubeMetricName = projectName + ",sub=" + cubeName; + update(getQueryMetrics(cubeMetricName, metricsMap), sqlResponse); + } + + private static void update(QueryMetrics queryMetrics, SQLResponse sqlResponse) { + try { + incrQueryCount(queryMetrics, sqlResponse); + incrCacheHitCount(queryMetrics, sqlResponse); + + if (!sqlResponse.getIsException()) { + queryMetrics.addQueryLatency(sqlResponse.getDuration()); + queryMetrics.addScanRowCount(sqlResponse.getTotalScanCount()); + queryMetrics.addResultRowCount(sqlResponse.getResults().size()); + } + } catch (Exception e) { + logger.error(e.getMessage()); + } + + } + + private static void incrQueryCount(QueryMetrics queryMetrics, SQLResponse sqlResponse) { + if (!sqlResponse.isHitExceptionCache() && !sqlResponse.getIsException()) { + queryMetrics.incrQuerySuccessCount(); + } else { + queryMetrics.incrQueryFailCount(); + } + queryMetrics.incrQueryCount(); + } + + private static void incrCacheHitCount(QueryMetrics queryMetrics, SQLResponse sqlResponse) { + if (sqlResponse.isStorageCacheUsed()) { + queryMetrics.addCacheHitCount(1); + } + } + + private static QueryMetrics getQueryMetrics(String name, ConcurrentHashMap<String, QueryMetrics> metricsMap) { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + int[] intervals = config.getQueryMetricsPercentilesIntervals(); + + if (metricsMap.containsKey(name)) { + return metricsMap.get(name); + } else { + QueryMetrics queryMetrics = new QueryMetrics(intervals).registerWith(name); + metricsMap.put(name, queryMetrics); + return queryMetrics; + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/0188a264/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java new file mode 100644 index 0000000..cd7ff7a --- /dev/null +++ b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.metrics; + +import org.apache.kylin.rest.request.SQLRequest; +import org.apache.kylin.rest.response.SQLResponse; +import org.apache.kylin.rest.service.ServiceTestBase; +import org.apache.kylin.rest.util.QueryMetricsUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.management.MBeanServer; +import javax.management.ObjectName; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +public class QueryMetricsTest extends ServiceTestBase { + + private static MBeanServer mBeanServer; + private static ObjectName objectName; + private ConcurrentHashMap<String, QueryMetrics> metricsMap = new ConcurrentHashMap<String, QueryMetrics>(); + + @Before + public void setup() throws Exception { + + super.setup(); + + mBeanServer = ManagementFactory.getPlatformMBeanServer(); + objectName = new ObjectName("Hadoop:service=Kylin,name=Server_Total"); + } + + @Test + public void testQueryMetrics() throws Exception { + SQLRequest sqlRequest = new SQLRequest(); + sqlRequest.setSql("select * from TEST_KYLIN_FACT"); + sqlRequest.setProject("default"); + + SQLResponse sqlResponse = new SQLResponse(); + sqlResponse.setDuration(10); + sqlResponse.setCube("test_cube"); + sqlResponse.setIsException(false); + sqlResponse.setTotalScanCount(100); + List<String> list1 = new ArrayList<>(); + list1.add("111"); + list1.add("112"); + List<String> list2 = new ArrayList<>(); + list2.add("111"); + list2.add("112"); + List<List<String>> results = new ArrayList<>(); + results.add(list1); + results.add(list2); + sqlResponse.setResults(results); + sqlResponse.setStorageCacheUsed(true); + + QueryMetricsUtil.updateMetrics(sqlRequest, sqlResponse, metricsMap); + + Thread.sleep(2000); + + Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QueryCount")); + Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QuerySuccessCount")); + Assert.assertEquals(0L, mBeanServer.getAttribute(objectName, "QueryFailCount")); + Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "CacheHitCount")); + + Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "ScanRowCountNumOps")); + Assert.assertEquals(100.0, mBeanServer.getAttribute(objectName, "ScanRowCountAvgTime")); + Assert.assertEquals(100.0, mBeanServer.getAttribute(objectName, "ScanRowCountMaxTime")); + Assert.assertEquals(100.0, mBeanServer.getAttribute(objectName, "ScanRowCountMinTime")); + + Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "ResultRowCountNumOps")); + Assert.assertEquals(2.0, mBeanServer.getAttribute(objectName, "ResultRowCountMaxTime")); + Assert.assertEquals(2.0, mBeanServer.getAttribute(objectName, "ResultRowCountAvgTime")); + Assert.assertEquals(2.0, mBeanServer.getAttribute(objectName, "ResultRowCountMinTime")); + + Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QueryLatencyNumOps")); + Assert.assertEquals(10.0, mBeanServer.getAttribute(objectName, "QueryLatencyMaxTime")); + Assert.assertEquals(10.0, mBeanServer.getAttribute(objectName, "QueryLatencyAvgTime")); + Assert.assertEquals(10.0, mBeanServer.getAttribute(objectName, "QueryLatencyMinTime")); + + SQLResponse sqlResponse2 = new SQLResponse(); + sqlResponse2.setDuration(10); + sqlResponse2.setCube("test_cube"); + sqlResponse2.setIsException(true); + + QueryMetricsUtil.updateMetrics(sqlRequest, sqlResponse2, metricsMap); + + Thread.sleep(2000); + + Assert.assertEquals(2L, mBeanServer.getAttribute(objectName, "QueryCount")); + Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QuerySuccessCount")); + Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QueryFailCount")); + + } + + @Test + public void test() throws Exception { + + } +}