This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit ad7b47245c959ea3e57efe1c8f5a737d0b1653ed Author: Zhichao Zhang <441586...@qq.com> AuthorDate: Thu Jan 14 14:51:59 2021 +0800 add test case Add metrics check --- build/bin/system-cube.sh | 20 +- .../org/apache/kylin/common/KylinConfigBase.java | 2 +- .../apache/kylin/metrics/QuerySparkMetrics.java | 98 ++++---- examples/test_case_data/localmeta/kylin.properties | 5 +- .../spark/sql/metrics/SparderMetricsListener.scala | 2 +- .../apache/kylin/rest/init/InitialTaskManager.java | 2 + .../kylin/rest/metrics/QueryMetricsFacade.java | 4 +- .../apache/kylin/rest/response/SQLResponse.java | 10 + .../kylin/rest/service/DashboardService.java | 1 - .../apache/kylin/rest/service/QueryService.java | 12 +- .../kylin/rest/metrics/QueryMetricsTest.java | 271 ++++++++++++++++++++- .../tool/metrics/systemcube/CubeDescCreator.java | 2 +- .../tool/metrics/systemcube/HiveTableCreator.java | 2 +- 13 files changed, 363 insertions(+), 68 deletions(-) diff --git a/build/bin/system-cube.sh b/build/bin/system-cube.sh index 20f7861..ca35970 100644 --- a/build/bin/system-cube.sh +++ b/build/bin/system-cube.sh @@ -74,18 +74,14 @@ then cat <<-EOF > ${SINK_TOOLS_FILE} [ [ - "org.apache.kylin.tool.metrics.systemcube.util.HiveSinkTool", - { - "storage_type": 2, - "cube_desc_override_properties": [ - "java.util.HashMap", - { - "kylin.cube.algorithm": "INMEM", - "kylin.cube.max-building-segments": "1" - } - ] - } - ] + { + "sink": "hive", + "storage_type": 4, + "cube_desc_override_properties": { + "kylin.cube.max-building-segments": "1" + } + } + ] ] EOF $KYLIN_HOME/bin/kylin.sh org.apache.kylin.tool.metrics.systemcube.SCCreator \ diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 36950ec..21f05db 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2364,7 +2364,7 @@ public abstract class KylinConfigBase implements Serializable { } public int getKylinMetricsCacheExpireSeconds() { - return Integer.parseInt(this.getOptional("kylin.metrics.query-cache.expire-seconds", "600")); + return Integer.parseInt(this.getOptional("kylin.metrics.query-cache.expire-seconds", "300")); } public int getKylinMetricsCacheMaxEntries() { diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java b/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java index ed2430c..a0efe64 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java @@ -36,42 +36,62 @@ import java.io.Serializable; import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class QuerySparkMetrics { private static final Logger logger = LoggerFactory.getLogger(QuerySparkMetrics.class); - private static final QuerySparkMetrics instance = new QuerySparkMetrics(); + private static ScheduledExecutorService scheduledExecutor = null; + private static QuerySparkMetrics instance = + new QuerySparkMetrics(new QuerySparkMetricsRemovalListener()); private static final int sparkMetricsNum = 10; private org.apache.kylin.shaded.com.google.common.cache.Cache<String, QueryExecutionMetrics> queryExecutionMetricsMap; - private QuerySparkMetrics() { + // default removal listener + private static class QuerySparkMetricsRemovalListener implements RemovalListener<String, + QueryExecutionMetrics> { + @Override + public void onRemoval(RemovalNotification<String, QueryExecutionMetrics> notification) { + try { + updateMetricsToReservoir(notification.getKey(), notification.getValue()); + logger.info("Query metrics {} is removed due to {}, update to metrics reservoir successful", + notification.getKey(), notification.getCause()); + } catch (Exception e) { + logger.warn("Query metrics {} is removed due to {}, update to metrics reservoir failed", + notification.getKey(), notification.getCause()); + } + } + } + + private QuerySparkMetrics(RemovalListener removalListener) { + if (queryExecutionMetricsMap != null) { + queryExecutionMetricsMap.cleanUp(); + queryExecutionMetricsMap = null; + } queryExecutionMetricsMap = CacheBuilder.newBuilder() .maximumSize(KylinConfig.getInstanceFromEnv().getKylinMetricsCacheMaxEntries()) .expireAfterWrite(KylinConfig.getInstanceFromEnv().getKylinMetricsCacheExpireSeconds(), TimeUnit.SECONDS) - .removalListener(new RemovalListener<String, QueryExecutionMetrics>() { - @Override - public void onRemoval(RemovalNotification<String, QueryExecutionMetrics> notification) { - try { - updateMetricsToReservoir(notification.getKey(), notification.getValue()); - logger.info("Query metrics {} is removed due to {}, update to metrics reservoir successful", - notification.getKey(), notification.getCause()); - } catch(Exception e) { - logger.warn("Query metrics {} is removed due to {}, update to metrics reservoir failed", - notification.getKey(), notification.getCause()); - } - } - }).build(); - - Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - queryExecutionMetricsMap.cleanUp(); - } - }, + .removalListener(removalListener).build(); + + if (scheduledExecutor != null && !scheduledExecutor.isShutdown()) { + scheduledExecutor.shutdown(); + scheduledExecutor = null; + } + scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); + scheduledExecutor.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + queryExecutionMetricsMap.cleanUp(); + } + }, KylinConfig.getInstanceFromEnv().getKylinMetricsCacheExpireSeconds(), KylinConfig.getInstanceFromEnv().getKylinMetricsCacheExpireSeconds(), TimeUnit.SECONDS); + } + // only for test case + public static void init(RemovalListener removalListener) { + instance = new QuerySparkMetrics(removalListener); } public static QuerySparkMetrics getInstance() { @@ -159,19 +179,11 @@ public class QuerySparkMetrics { return queryExecutionMetricsMap.getIfPresent(queryId); } - public void setQueryRealization(String queryId, String realizationName, int realizationType, String cuboidIds) { - QueryExecutionMetrics queryExecutionMetrics = queryExecutionMetricsMap.getIfPresent(queryId); - if (queryExecutionMetrics != null) { - queryExecutionMetrics.setRealization(realizationName); - queryExecutionMetrics.setRealizationType(realizationType); - queryExecutionMetrics.setCuboidIds(cuboidIds); - } - } - /** * report query related metrics */ - public void updateMetricsToReservoir(String queryId, QueryExecutionMetrics queryExecutionMetrics) { + public static void updateMetricsToReservoir(String queryId, + QueryExecutionMetrics queryExecutionMetrics) { if (!KylinConfig.getInstanceFromEnv().isKylinMetricsReporterForQueryEnabled()) { return; } @@ -186,7 +198,8 @@ public class QuerySparkMetrics { setSparkExecutionWrapper(queryExecutionMetricsEvent, queryExecutionMetrics.getSparderName(), queryExecutionMetrics.getExecutionId(), queryExecutionMetrics.getRealization(), - queryExecutionMetrics.getRealizationType(), queryExecutionMetrics.getCuboidIds(), + queryExecutionMetrics.getRealizationTypes(), + queryExecutionMetrics.getCuboidIds(), queryExecutionMetrics.getStartTime(), queryExecutionMetrics.getEndTime()); setQueryMetrics(queryExecutionMetricsEvent, queryExecutionMetrics.getSqlDuration(), @@ -244,7 +257,8 @@ public class QuerySparkMetrics { queryExecutionMetricsList[i] += sparkJobMetricsList[i]; } } - setSparkExecutionMetrics(queryExecutionMetricsEvent, queryExecutionMetrics.getExecutionDuration(), + setSparkExecutionMetrics(queryExecutionMetricsEvent, + queryExecutionMetrics.getEndTime() - queryExecutionMetrics.getStartTime(), queryExecutionMetricsList[0], queryExecutionMetricsList[1], queryExecutionMetricsList[2], queryExecutionMetricsList[3], queryExecutionMetricsList[4], queryExecutionMetricsList[5], queryExecutionMetricsList[6], queryExecutionMetricsList[7], queryExecutionMetricsList[8], @@ -264,8 +278,10 @@ public class QuerySparkMetrics { metricsEvent.put(QuerySparkExecutionEnum.EXCEPTION.toString(), exception); } - private static void setSparkExecutionWrapper(RecordEvent metricsEvent, String sparderName, long executionId, - String realizationName, int realizationType, String cuboidIds, long startTime, long endTime) { + private static void setSparkExecutionWrapper(RecordEvent metricsEvent, String sparderName, + long executionId, String realizationName, + String realizationType, String cuboidIds, + long startTime, long endTime) { metricsEvent.put(QuerySparkExecutionEnum.SPARDER_NAME.toString(), sparderName); metricsEvent.put(QuerySparkExecutionEnum.EXECUTION_ID.toString(), executionId); metricsEvent.put(QuerySparkExecutionEnum.REALIZATION.toString(), realizationName); @@ -365,7 +381,7 @@ public class QuerySparkMetrics { private long executionDuration; private String queryId; private String realization; - private int realizationType; + private String realizationTypes; private String cuboidIds; private long startTime; private long endTime; @@ -512,16 +528,16 @@ public class QuerySparkMetrics { return realization; } - public int getRealizationType() { - return realizationType; + public String getRealizationTypes() { + return realizationTypes; } public void setRealization(String realization) { this.realization = realization; } - public void setRealizationType(int realizationType) { - this.realizationType = realizationType; + public void setRealizationTypes(String realizationTypes) { + this.realizationTypes = realizationTypes; } public void setSparkJobMetricsMap(ConcurrentMap<Integer, SparkJobMetrics> sparkJobMetricsMap) { diff --git a/examples/test_case_data/localmeta/kylin.properties b/examples/test_case_data/localmeta/kylin.properties index ca9543e..d17ce8c 100644 --- a/examples/test_case_data/localmeta/kylin.properties +++ b/examples/test_case_data/localmeta/kylin.properties @@ -159,4 +159,7 @@ kylin.source.jdbc.connection-url=jdbc:h2:mem:db kylin.source.jdbc.user= kylin.source.jdbc.pass= -kylin.query.auto-sparder-context=false \ No newline at end of file +kylin.query.auto-sparder-context=false + +kylin.metrics.query-cache.expire-seconds=5 +kylin.metrics.query-cache.max-entries=2 \ No newline at end of file diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/SparderMetricsListener.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/SparderMetricsListener.scala index 6237235..84097e6 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/SparderMetricsListener.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/SparderMetricsListener.scala @@ -98,7 +98,7 @@ class SparderMetricsListener() extends SparkListener with Logging { val stageMetrics = stageInfo.taskMetrics val sparkStageMetrics = new QuerySparkMetrics.SparkStageMetrics sparkStageMetrics.setMetrics(stageMetrics.resultSize, stageMetrics.executorDeserializeCpuTime, - stageMetrics.executorDeserializeCpuTime, stageMetrics.executorRunTime, stageMetrics.executorCpuTime, + stageMetrics.executorDeserializeTime, stageMetrics.executorRunTime, stageMetrics.executorCpuTime, stageMetrics.jvmGCTime, stageMetrics.resultSerializationTime, stageMetrics.memoryBytesSpilled, stageMetrics.diskBytesSpilled, stageMetrics.peakExecutionMemory) queryExecutionMetrics.updateSparkStageMetrics(jobExecutionMap.apply(stageJobMap.apply(stageInfo.stageId)).queryId, diff --git a/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java b/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java index 876ae08..feaf3a3 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java +++ b/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java @@ -21,6 +21,7 @@ package org.apache.kylin.rest.init; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.StringUtil; +import org.apache.kylin.metrics.QuerySparkMetrics; import org.apache.kylin.rest.metrics.QueryMetrics2Facade; import org.apache.kylin.rest.metrics.QueryMetricsFacade; import org.slf4j.Logger; @@ -44,6 +45,7 @@ public class InitialTaskManager implements InitializingBean { private void runInitialTasks() { // init metrics system for kylin + QuerySparkMetrics.getInstance(); QueryMetricsFacade.init(); QueryMetrics2Facade.init(); diff --git a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java index 7a8f2d7..db01f33 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java +++ b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java @@ -93,7 +93,9 @@ public class QueryMetricsFacade { queryExecutionMetrics.setSqlIdCode(getSqlHashCode(sqlRequest.getSql())); queryExecutionMetrics.setProject(norm(sqlRequest.getProject())); queryExecutionMetrics.setQueryType(sqlResponse.isStorageCacheUsed() ? "CACHE" : "PARQUET"); - + queryExecutionMetrics.setRealization(sqlResponse.getCube()); + queryExecutionMetrics.setRealizationTypes(sqlResponse.getRealizationTypes()); + queryExecutionMetrics.setCuboidIds(sqlResponse.getCuboidIds()); queryExecutionMetrics.setSqlDuration(sqlResponse.getDuration()); queryExecutionMetrics.setTotalScanCount(sqlResponse.getTotalScanCount()); queryExecutionMetrics.setTotalScanBytes(sqlResponse.getTotalScanBytes()); diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java index d5d57ed..37578ec 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java +++ b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java @@ -48,6 +48,8 @@ public class SQLResponse implements Serializable { protected String cuboidIds; + protected String realizationTypes; + // if not select query, only return affected row count protected int affectedRowCount; @@ -286,6 +288,14 @@ public class SQLResponse implements Serializable { this.lazyQueryStartTime = lazyQueryStartTime; } + public String getRealizationTypes() { + return realizationTypes; + } + + public void setRealizationTypes(String realizationTypes) { + this.realizationTypes = realizationTypes; + } + @JsonIgnore public List<QueryContext.CubeSegmentStatisticsResult> getCubeSegmentStatisticsList() { try { diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java b/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java index 768876f..392fac2 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java @@ -32,7 +32,6 @@ import org.apache.kylin.metrics.MetricsManager; import org.apache.kylin.metrics.lib.impl.TimePropertyEnum; import org.apache.kylin.metrics.property.JobPropertyEnum; import org.apache.kylin.metrics.property.QuerySparkExecutionEnum; -import org.apache.kylin.metrics.property.QuerySparkExecutionEnum; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.exception.BadRequestException; import org.apache.kylin.rest.request.PrepareSqlRequest; diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index f020d6b..7296636 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -1180,6 +1180,7 @@ public class QueryService extends BasicService { List<String> realizations = Lists.newLinkedList(); StringBuilder cubeSb = new StringBuilder(); StringBuilder cuboidIdsSb = new StringBuilder(); + StringBuilder realizationTypeSb = new StringBuilder(); StringBuilder logSb = new StringBuilder("Processed rows for each storageContext: "); QueryContext queryContext = QueryContextFacade.current(); if (OLAPContext.getThreadLocalContexts() != null) { // contexts can be null in case of 'explain plan for' @@ -1191,6 +1192,7 @@ public class QueryService extends BasicService { if (cubeSb.length() > 0) { cubeSb.append(","); } + cubeSb.append(ctx.realization.getCanonicalName()); Cuboid cuboid = ctx.storageContext.getCuboid(); if (cuboid != null) { //Some queries do not involve cuboid, e.g. lookup table query @@ -1199,25 +1201,25 @@ public class QueryService extends BasicService { } cuboidIdsSb.append(cuboid.getId()); } - cubeSb.append(ctx.realization.getCanonicalName()); logSb.append(ctx.storageContext.getProcessedRowCount()).append(" "); realizationName = ctx.realization.getName(); realizationType = ctx.realization.getStorageType(); + if (realizationTypeSb.length() > 0) { + realizationTypeSb.append(","); + } + realizationTypeSb.append(realizationType); realizations.add(realizationName); } - QuerySparkMetrics.getInstance().setQueryRealization(queryContext.getQueryId(), realizationName, - realizationType, cuboidIdsSb.toString()); } - - } logger.info(logSb.toString()); SQLResponse response = new SQLResponse(columnMetas, results, cubeSb.toString(), 0, isException, exceptionMessage, isPartialResult, isPushDown); response.setCuboidIds(cuboidIdsSb.toString()); + response.setRealizationTypes(realizationTypeSb.toString()); response.setTotalScanCount(queryContext.getScannedRows()); response.setTotalScanFiles((queryContext.getScanFiles() < 0) ? -1 : queryContext.getScanFiles()); diff --git a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java index fbacc78..3ad7eea 100644 --- a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java +++ b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java @@ -21,23 +21,31 @@ package org.apache.kylin.rest.metrics; import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import javax.management.MBeanServer; import javax.management.ObjectName; +import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.QueryContextFacade; +import org.apache.kylin.metrics.QuerySparkMetrics; import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.response.SQLResponse; import org.apache.kylin.rest.service.ServiceTestBase; +import org.apache.kylin.shaded.com.google.common.cache.RemovalListener; +import org.apache.kylin.shaded.com.google.common.cache.RemovalNotification; import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; public class QueryMetricsTest extends ServiceTestBase { private static MBeanServer mBeanServer; private static ObjectName objectName; + private static AtomicInteger sparkMetricsReportCnt = new AtomicInteger(0); -// @Before + @Before public void setup() throws Exception { super.setup(); @@ -45,7 +53,7 @@ public class QueryMetricsTest extends ServiceTestBase { objectName = new ObjectName("Hadoop:service=Kylin,name=Server_Total"); } -// @Test + @Test public void testQueryMetrics() throws Exception { System.setProperty("kylin.server.query-metrics-enabled", "true"); QueryMetricsFacade.init(); @@ -111,7 +119,7 @@ public class QueryMetricsTest extends ServiceTestBase { System.clearProperty("kylin.server.query-metrics-enabled"); } -// @Test + @Test public void testQueryStatisticsResult() throws Exception { System.setProperty("kylin.metrics.reporter-query-enabled", "true"); QueryMetricsFacade.init(); @@ -153,4 +161,261 @@ public class QueryMetricsTest extends ServiceTestBase { System.clearProperty("kylin.server.query-metrics-enabled"); System.out.println("------------testQueryStatisticsResult done------------"); } + + @Test + public void testQuerySparkMetrics() throws Exception { + sparkMetricsReportCnt.set(0); + System.setProperty("kylin.server.query-metrics-enabled", "true"); + QuerySparkMetrics.init(new QuerySparkMetricsTestRemovalListener()); + QueryMetricsFacade.init(); + + SQLRequest sqlRequest = new SQLRequest(); + sqlRequest.setSql("select * from TEST_KYLIN_FACT"); + sqlRequest.setProject("default"); + + String queryId1 = "1"; + generateSparkMetrics(queryId1); + + SQLResponse sqlResponse = new SQLResponse(); + sqlResponse.setDuration(10); + sqlResponse.setCube("test_cube"); + sqlResponse.setCuboidIds("12345"); + sqlResponse.setRealizationTypes("4"); + sqlResponse.setIsException(false); + sqlResponse.setTotalScanCount(100); + List<String> list1 = new ArrayList<>(); + list1.add("111"); + list1.add("112"); + List<String> list2 = new ArrayList<>(); + list2.add("111"); + list2.add("112"); + List<List<String>> results = new ArrayList<>(); + results.add(list1); + results.add(list2); + sqlResponse.setResults(results); + sqlResponse.setStorageCacheUsed(true); + + QueryMetricsFacade.updateMetrics(queryId1, sqlRequest, sqlResponse); + + Thread.sleep(3000); + + updateSparkMetrics(queryId1); + + Assert.assertTrue(QuerySparkMetrics.getInstance().getQueryExecutionMetrics(queryId1) != null); + Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QueryCount")); + Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QuerySuccessCount")); + Assert.assertEquals(0L, mBeanServer.getAttribute(objectName, "QueryFailCount")); + Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "CacheHitCount")); + + Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "ScanRowCountNumOps")); + Assert.assertEquals(100.0, mBeanServer.getAttribute(objectName, "ScanRowCountAvgTime")); + Assert.assertEquals(100.0, mBeanServer.getAttribute(objectName, "ScanRowCountMaxTime")); + Assert.assertEquals(100.0, mBeanServer.getAttribute(objectName, "ScanRowCountMinTime")); + + Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "ResultRowCountNumOps")); + Assert.assertEquals(2.0, mBeanServer.getAttribute(objectName, "ResultRowCountMaxTime")); + Assert.assertEquals(2.0, mBeanServer.getAttribute(objectName, "ResultRowCountAvgTime")); + Assert.assertEquals(2.0, mBeanServer.getAttribute(objectName, "ResultRowCountMinTime")); + + Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QueryLatencyNumOps")); + Assert.assertEquals(10.0, mBeanServer.getAttribute(objectName, "QueryLatencyMaxTime")); + Assert.assertEquals(10.0, mBeanServer.getAttribute(objectName, "QueryLatencyAvgTime")); + Assert.assertEquals(10.0, mBeanServer.getAttribute(objectName, "QueryLatencyMinTime")); + + String queryId2 = "2"; + generateSparkMetrics(queryId2); + + Thread.sleep(3000); + + Assert.assertTrue(QuerySparkMetrics.getInstance().getQueryExecutionMetrics(queryId1) == null); + Assert.assertTrue(QuerySparkMetrics.getInstance().getQueryExecutionMetrics(queryId2) != null); + + updateSparkMetrics(queryId2); + + SQLResponse sqlResponse2 = new SQLResponse(); + sqlResponse2.setDuration(10); + sqlResponse2.setCube("test_cube"); + sqlResponse2.setIsException(true); + + QueryMetricsFacade.updateMetrics(queryId2, sqlRequest, sqlResponse2); + + Thread.sleep(5000); + + Assert.assertEquals(2L, mBeanServer.getAttribute(objectName, "QueryCount")); + Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QuerySuccessCount")); + Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QueryFailCount")); + + Assert.assertTrue(QuerySparkMetrics.getInstance().getQueryExecutionMetrics(queryId2) == null); + Assert.assertEquals(2, sparkMetricsReportCnt.get()); + System.clearProperty("kylin.server.query-metrics-enabled"); + } + + public void generateSparkMetrics(String queryId) { + Integer id = Integer.valueOf(queryId); + long executionStartTime = 0; + long jobStartTime= 0; + long submitTime = 0; + + if (id == 1) { + executionStartTime = 1610609727972L; + jobStartTime = 1610609772880L; + submitTime = 1610610480942L; + } else if (id == 2) { + executionStartTime = 1610609876545L; + jobStartTime = 1610609901699L; + submitTime = 16106105016542L; + } + QuerySparkMetrics.getInstance().onJobStart(queryId, "test-sparder_" + id, id, + executionStartTime, id, jobStartTime); + QuerySparkMetrics.getInstance().onSparkStageStart(queryId, id, id, "stageType_" + id, + submitTime); + } + + public void updateSparkMetrics(String queryId) { + Integer id = Integer.valueOf(queryId); + long jobEndTime = 0; + long executionEndTime = 0; + boolean jobIsSuccess = true; + + QuerySparkMetrics.SparkStageMetrics queryStageMetrics = new QuerySparkMetrics.SparkStageMetrics(); + if (id == 1) { + jobEndTime = 1610610734401L; + executionEndTime = 1610612655793L; + jobIsSuccess = true; + queryStageMetrics.setMetrics(10000, 10, 10, 100, 10, 1, 10, 1000, 1000, 100); + } else if (id == 2) { + jobEndTime = 1610610750397L; + executionEndTime = 1610612685275L; + jobIsSuccess = false; + queryStageMetrics.setMetrics(20000, 20, 20, 200, 20, 2, 20, 2000, 2000, 200); + } + QuerySparkMetrics.getInstance().updateSparkStageMetrics(queryId, id, id, true, + queryStageMetrics); + QuerySparkMetrics.getInstance().updateSparkJobMetrics(queryId, id, jobEndTime, jobIsSuccess); + QuerySparkMetrics.getInstance().updateExecutionMetrics(queryId, executionEndTime); + } + + public static void verifyQuerySparkMetrics(String queryId, + QuerySparkMetrics.QueryExecutionMetrics queryExecutionMetrics) { + sparkMetricsReportCnt.getAndIncrement(); + Assert.assertTrue(StringUtils.isNotBlank(queryId)); + Assert.assertTrue(queryExecutionMetrics != null); + // verify + int id = Integer.valueOf(queryId); + Assert.assertTrue(queryExecutionMetrics.getSparkJobMetricsMap().get(id) != null); + Assert.assertTrue(queryExecutionMetrics.getSparkJobMetricsMap().get(id) != null); + Assert.assertEquals(queryExecutionMetrics.getSparderName(), "test-sparder_" + id); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id).getJobId(), id); + + if (id == 1) { + //SqlResponse metrics + Assert.assertEquals(queryExecutionMetrics.getUser(), "ADMIN"); + Assert.assertEquals(queryExecutionMetrics.getProject(), "DEFAULT"); + Assert.assertEquals(queryExecutionMetrics.getQueryType(), "CACHE"); + Assert.assertEquals(queryExecutionMetrics.getRealization(), "test_cube"); + Assert.assertEquals(queryExecutionMetrics.getRealizationTypes(), "4"); + Assert.assertEquals(queryExecutionMetrics.getCuboidIds(), "12345"); + Assert.assertEquals(queryExecutionMetrics.getTotalScanCount(), 100); + Assert.assertEquals(queryExecutionMetrics.getResultCount(), 2); + Assert.assertEquals(queryExecutionMetrics.getException(), "NULL"); + + Assert.assertEquals(queryExecutionMetrics.getSparderName(), "test-sparder_" + id); + Assert.assertEquals(queryExecutionMetrics.getException(), "NULL"); + + //SparkExecution metrics + Assert.assertEquals(queryExecutionMetrics.getStartTime(), 1610609727972L); + Assert.assertEquals(queryExecutionMetrics.getEndTime(), 1610612655793L); + Assert.assertTrue(queryExecutionMetrics.getSparkJobMetricsMap().get(id).isSuccess()); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id).getStartTime(), 1610609772880L); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id).getEndTime(), 1610610734401L); + + //SparkStage metrics + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id) + .getSparkStageMetricsMap().size(), 1); + Assert.assertTrue(queryExecutionMetrics.getSparkJobMetricsMap().get(id) + .getSparkStageMetricsMap().get(id) != null); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id) + .getSparkStageMetricsMap().get(id).getStageType(), "stageType_" + id); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id) + .getSparkStageMetricsMap().get(id).getStageId(), id); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id) + .getSparkStageMetricsMap().get(id).getResultSize(), 10000); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id) + .getSparkStageMetricsMap().get(id).getExecutorDeserializeCpuTime(), 10); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id) + .getSparkStageMetricsMap().get(id).getExecutorDeserializeTime(), 10); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id) + .getSparkStageMetricsMap().get(id).getExecutorRunTime(), 100); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id) + .getSparkStageMetricsMap().get(id).getExecutorCpuTime(), 10); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id) + .getSparkStageMetricsMap().get(id).getJvmGCTime(), 1); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id) + .getSparkStageMetricsMap().get(id).getResultSerializationTime(), 10); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id) + .getSparkStageMetricsMap().get(id).getMemoryBytesSpilled(), 1000); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id) + .getSparkStageMetricsMap().get(id).getDiskBytesSpilled(), 1000); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id) + .getSparkStageMetricsMap().get(id).getPeakExecutionMemory(), 100); + } else if (id == 2) { + //SqlResponse metrics + Assert.assertEquals(queryExecutionMetrics.getUser(), "ADMIN"); + Assert.assertEquals(queryExecutionMetrics.getProject(), "DEFAULT"); + Assert.assertEquals(queryExecutionMetrics.getQueryType(), "PARQUET"); + Assert.assertEquals(queryExecutionMetrics.getRealization(), "test_cube"); + Assert.assertEquals(queryExecutionMetrics.getRealizationTypes(), null); + Assert.assertEquals(queryExecutionMetrics.getCuboidIds(), null); + Assert.assertEquals(queryExecutionMetrics.getTotalScanCount(), 0); + Assert.assertEquals(queryExecutionMetrics.getResultCount(), 0); + Assert.assertEquals(queryExecutionMetrics.getException(), "NULL"); + + //SparkExecution metrics + Assert.assertEquals(queryExecutionMetrics.getStartTime(), 1610609876545L); + Assert.assertEquals(queryExecutionMetrics.getEndTime(), 1610612685275L); + Assert.assertFalse(queryExecutionMetrics.getSparkJobMetricsMap().get(id).isSuccess()); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id).getStartTime(), 1610609901699L); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id).getEndTime(), 1610610750397L); + + //SparkStage metrics + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id) + .getSparkStageMetricsMap().size(), 1); + Assert.assertTrue(queryExecutionMetrics.getSparkJobMetricsMap().get(id) + .getSparkStageMetricsMap().get(id) != null); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id) + .getSparkStageMetricsMap().get(id).getStageId(), id); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id) + .getSparkStageMetricsMap().get(id).getResultSize(), 20000); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id) + .getSparkStageMetricsMap().get(id).getExecutorDeserializeCpuTime(), 20); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id) + .getSparkStageMetricsMap().get(id).getExecutorDeserializeTime(), 20); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id) + .getSparkStageMetricsMap().get(id).getExecutorRunTime(), 200); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id) + .getSparkStageMetricsMap().get(id).getExecutorCpuTime(), 20); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id) + .getSparkStageMetricsMap().get(id).getJvmGCTime(), 2); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id) + .getSparkStageMetricsMap().get(id).getResultSerializationTime(), 20); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id) + .getSparkStageMetricsMap().get(id).getMemoryBytesSpilled(), 2000); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id) + .getSparkStageMetricsMap().get(id).getDiskBytesSpilled(), 2000); + Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id) + .getSparkStageMetricsMap().get(id).getPeakExecutionMemory(), 200); + } + } + + private static class QuerySparkMetricsTestRemovalListener implements RemovalListener<String, + QuerySparkMetrics.QueryExecutionMetrics> { + @Override + public void onRemoval(RemovalNotification<String, QuerySparkMetrics.QueryExecutionMetrics> notification) { + try { + verifyQuerySparkMetrics(notification.getKey(), notification.getValue()); + } catch (Exception e) { + e.printStackTrace(); + } + } + } } diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java index 2d82e02..f3e7168 100644 --- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java +++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java @@ -467,7 +467,7 @@ public class CubeDescCreator { desc.setDimensions(dimensionDescList); desc.setMeasures(measureDescList); desc.setRowkey(rowKeyDesc); - //desc.setHbaseMapping(hBaseMapping); + desc.setHbaseMapping(hBaseMapping); desc.setNotifyList(Lists.<String> newArrayList()); desc.setStatusNeedNotify(Lists.newArrayList(JobStatusEnum.ERROR.toString())); desc.setAutoMergeTimeRanges(new long[] { 86400000L, 604800000L, 2419200000L }); diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java index 73e493e..af40392 100644 --- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java +++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java @@ -130,7 +130,7 @@ public class HiveTableCreator { columns.add(new Pair<>(QuerySparkExecutionEnum.SPARDER_NAME.toString(), HiveTypeEnum.HSTRING.toString())); columns.add(new Pair<>(QuerySparkExecutionEnum.PROJECT.toString(), HiveTypeEnum.HSTRING.toString())); columns.add(new Pair<>(QuerySparkExecutionEnum.REALIZATION.toString(), HiveTypeEnum.HSTRING.toString())); - columns.add(new Pair<>(QuerySparkExecutionEnum.REALIZATION_TYPE.toString(), HiveTypeEnum.HINT.toString())); + columns.add(new Pair<>(QuerySparkExecutionEnum.REALIZATION_TYPE.toString(), HiveTypeEnum.HSTRING.toString())); columns.add(new Pair<>(QuerySparkExecutionEnum.CUBOID_IDS.toString(), HiveTypeEnum.HSTRING.toString())); columns.add(new Pair<>(QuerySparkExecutionEnum.TYPE.toString(), HiveTypeEnum.HSTRING.toString()));