This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit af3a1b55ae1bbfb157ce9ec63a73651865c4c94a Author: yaqian.zhang <598593...@qq.com> AuthorDate: Tue Jan 5 18:49:19 2021 +0800 KYLIN-4857 Refactor system cube for kylin4 --- .../org/apache/kylin/common/KylinConfigBase.java | 20 +- .../apache/kylin/metrics/QuerySparkMetrics.java | 741 +++++++++++++++++++++ .../metrics/property/QuerySparkExecutionEnum.java | 80 +++ .../kylin/metrics/property/QuerySparkJobEnum.java | 66 ++ .../metrics/property/QuerySparkStageEnum.java | 71 ++ .../org/apache/spark/sql/SparderContext.scala | 6 + .../spark/sql/metrics/SparderMetricsListener.scala | 144 ++++ .../kylin/rest/metrics/QueryMetricsFacade.java | 164 +---- .../org/apache/kylin/rest/service/CubeService.java | 6 +- .../kylin/rest/service/DashboardService.java | 21 +- .../apache/kylin/rest/service/QueryService.java | 16 +- .../kylin/rest/metrics/QueryMetricsTest.java | 6 +- .../tool/metrics/systemcube/CubeDescCreator.java | 156 +++-- .../metrics/systemcube/CubeInstanceCreator.java | 14 +- .../tool/metrics/systemcube/HiveTableCreator.java | 136 ++-- .../tool/metrics/systemcube/KylinTableCreator.java | 20 +- .../tool/metrics/systemcube/ModelCreator.java | 138 ++-- .../kylin/tool/metrics/systemcube/SCCreator.java | 18 +- .../systemcube/streamingv2/KafkaTopicCreator.java | 6 +- 19 files changed, 1450 insertions(+), 379 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index dc91a7f..36950ec 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2349,20 +2349,28 @@ public abstract class KylinConfigBase implements Serializable { + getKylinMetricsSubjectSuffix(); } - public String getKylinMetricsSubjectQuery() { - return getOptional("kylin.metrics.subject-query", "METRICS_QUERY") + "_" + getKylinMetricsSubjectSuffix(); + public String getKylinMetricsSubjectQueryExecution() { + return getOptional("kylin.metrics.subject-query", "METRICS_QUERY_EXECUTION") + "_" + getKylinMetricsSubjectSuffix(); } - public String getKylinMetricsSubjectQueryCube() { - return getOptional("kylin.metrics.subject-query-cube", "METRICS_QUERY_CUBE") + "_" + public String getKylinMetricsSubjectQuerySparkJob() { + return getOptional("kylin.metrics.subject-query-cube", "METRICS_QUERY_SPARK_JOB") + "_" + getKylinMetricsSubjectSuffix(); } - public String getKylinMetricsSubjectQueryRpcCall() { - return getOptional("kylin.metrics.subject-query-rpc", "METRICS_QUERY_RPC") + "_" + public String getKylinMetricsSubjectQuerySparkStage() { + return getOptional("kylin.metrics.subject-query-rpc", "METRICS_QUERY_SPARK_STAGE") + "_" + getKylinMetricsSubjectSuffix(); } + public int getKylinMetricsCacheExpireSeconds() { + return Integer.parseInt(this.getOptional("kylin.metrics.query-cache.expire-seconds", "600")); + } + + public int getKylinMetricsCacheMaxEntries() { + return Integer.parseInt(this.getOptional("kylin.metrics.query-cache.max-entries", "10000")); + } + public Map<String, String> getKylinMetricsConf() { return getPropertiesByPrefix("kylin.metrics."); } diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java b/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java new file mode 100644 index 0000000..ed2430c --- /dev/null +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java @@ -0,0 +1,741 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.metrics; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.metrics.lib.impl.RecordEvent; +import org.apache.kylin.metrics.lib.impl.TimedRecordEvent; +import org.apache.kylin.metrics.property.QuerySparkExecutionEnum; +import org.apache.kylin.metrics.property.QuerySparkJobEnum; +import org.apache.kylin.metrics.property.QuerySparkStageEnum; +import org.apache.kylin.shaded.com.google.common.cache.Cache; +import org.apache.kylin.shaded.com.google.common.cache.CacheBuilder; +import org.apache.kylin.shaded.com.google.common.cache.RemovalListener; +import org.apache.kylin.shaded.com.google.common.cache.RemovalNotification; +import org.apache.kylin.shaded.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class QuerySparkMetrics { + private static final Logger logger = LoggerFactory.getLogger(QuerySparkMetrics.class); + private static final QuerySparkMetrics instance = new QuerySparkMetrics(); + private static final int sparkMetricsNum = 10; + private org.apache.kylin.shaded.com.google.common.cache.Cache<String, QueryExecutionMetrics> queryExecutionMetricsMap; + + private QuerySparkMetrics() { + queryExecutionMetricsMap = CacheBuilder.newBuilder() + .maximumSize(KylinConfig.getInstanceFromEnv().getKylinMetricsCacheMaxEntries()) + .expireAfterWrite(KylinConfig.getInstanceFromEnv().getKylinMetricsCacheExpireSeconds(), + TimeUnit.SECONDS) + .removalListener(new RemovalListener<String, QueryExecutionMetrics>() { + @Override + public void onRemoval(RemovalNotification<String, QueryExecutionMetrics> notification) { + try { + updateMetricsToReservoir(notification.getKey(), notification.getValue()); + logger.info("Query metrics {} is removed due to {}, update to metrics reservoir successful", + notification.getKey(), notification.getCause()); + } catch(Exception e) { + logger.warn("Query metrics {} is removed due to {}, update to metrics reservoir failed", + notification.getKey(), notification.getCause()); + } + } + }).build(); + + Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + queryExecutionMetricsMap.cleanUp(); + } + }, + KylinConfig.getInstanceFromEnv().getKylinMetricsCacheExpireSeconds(), + KylinConfig.getInstanceFromEnv().getKylinMetricsCacheExpireSeconds(), TimeUnit.SECONDS); + + } + + public static QuerySparkMetrics getInstance() { + return instance; + } + + public void onJobStart(String queryId, String sparderName, long executionId, long executionStartTime, int jobId, + long jobStartTime) { + QueryExecutionMetrics queryExecutionMetrics = queryExecutionMetricsMap.getIfPresent(queryId); + if (queryExecutionMetrics == null) { + queryExecutionMetrics = new QueryExecutionMetrics(); + ConcurrentMap<Integer, SparkJobMetrics> sparkJobMetricsMap = Maps.newConcurrentMap(); + queryExecutionMetrics.setQueryId(queryId); + queryExecutionMetrics.setSparderName(sparderName); + queryExecutionMetrics.setExecutionId(executionId); + queryExecutionMetrics.setStartTime(executionStartTime); + queryExecutionMetrics.setSparkJobMetricsMap(sparkJobMetricsMap); + queryExecutionMetricsMap.put(queryId, queryExecutionMetrics); + } + SparkJobMetrics sparkJobMetrics = new SparkJobMetrics(); + sparkJobMetrics.setExecutionId(executionId); + sparkJobMetrics.setJobId(jobId); + sparkJobMetrics.setStartTime(jobStartTime); + + ConcurrentMap<Integer, SparkStageMetrics> sparkStageMetricsMap = Maps.newConcurrentMap(); + sparkJobMetrics.setSparkStageMetricsMap(sparkStageMetricsMap); + + queryExecutionMetrics.getSparkJobMetricsMap().put(jobId, sparkJobMetrics); + } + + public void onSparkStageStart(String queryId, int jobId, int stageId, String stageType, long submitTime) { + QueryExecutionMetrics queryExecutionMetrics = queryExecutionMetricsMap.getIfPresent(queryId); + if (queryExecutionMetrics != null && queryExecutionMetrics.getSparkJobMetricsMap().get(jobId) != null) { + SparkStageMetrics sparkStageMetrics = new SparkStageMetrics(); + sparkStageMetrics.setStageId(stageId); + sparkStageMetrics.setStageType(stageType); + sparkStageMetrics.setSubmitTime(submitTime); + queryExecutionMetrics.getSparkJobMetricsMap().get(jobId).getSparkStageMetricsMap().put(stageId, + sparkStageMetrics); + } + } + + public void updateSparkStageMetrics(String queryId, int jobId, int stageId, boolean isSuccess, + SparkStageMetrics sparkStageMetricsEnd) { + QueryExecutionMetrics queryExecutionMetrics = queryExecutionMetricsMap.getIfPresent(queryId); + if (queryExecutionMetrics != null) { + SparkJobMetrics sparkJobMetrics = queryExecutionMetrics.getSparkJobMetricsMap().get(jobId); + if (sparkJobMetrics != null) { + SparkStageMetrics sparkStageMetrics = sparkJobMetrics.getSparkStageMetricsMap().get(stageId); + if (sparkStageMetrics != null) { + sparkStageMetrics.setSuccess(isSuccess); + sparkStageMetrics.setMetrics(sparkStageMetricsEnd.getResultSize(), + sparkStageMetricsEnd.getExecutorDeserializeTime(), + sparkStageMetricsEnd.getExecutorDeserializeCpuTime(), + sparkStageMetricsEnd.getExecutorRunTime(), sparkStageMetricsEnd.getExecutorCpuTime(), + sparkStageMetricsEnd.getJvmGCTime(), sparkStageMetricsEnd.getResultSerializationTime(), + sparkStageMetricsEnd.getMemoryBytesSpilled(), sparkStageMetricsEnd.getDiskBytesSpilled(), + sparkStageMetricsEnd.getPeakExecutionMemory()); + } + } + } + } + + public void updateSparkJobMetrics(String queryId, int jobId, long jobEndTime, boolean isSuccess) { + QueryExecutionMetrics queryExecutionMetrics = queryExecutionMetricsMap.getIfPresent(queryId); + if (queryExecutionMetrics != null && queryExecutionMetrics.getSparkJobMetricsMap().get(jobId) != null) { + SparkJobMetrics sparkJobMetrics = queryExecutionMetrics.getSparkJobMetricsMap().get(jobId); + sparkJobMetrics.setEndTime(jobEndTime); + sparkJobMetrics.setSuccess(isSuccess); + } + } + + public void updateExecutionMetrics(String queryId, long executionEndTime) { + QueryExecutionMetrics queryExecutionMetrics = queryExecutionMetricsMap.getIfPresent(queryId); + if (queryExecutionMetrics != null) { + queryExecutionMetrics.setEndTime(executionEndTime); + } + } + + public Cache<String, QueryExecutionMetrics> getQueryExecutionMetricsMap() { + return queryExecutionMetricsMap; + } + + public QueryExecutionMetrics getQueryExecutionMetrics(String queryId) { + return queryExecutionMetricsMap.getIfPresent(queryId); + } + + public void setQueryRealization(String queryId, String realizationName, int realizationType, String cuboidIds) { + QueryExecutionMetrics queryExecutionMetrics = queryExecutionMetricsMap.getIfPresent(queryId); + if (queryExecutionMetrics != null) { + queryExecutionMetrics.setRealization(realizationName); + queryExecutionMetrics.setRealizationType(realizationType); + queryExecutionMetrics.setCuboidIds(cuboidIds); + } + } + + /** + * report query related metrics + */ + public void updateMetricsToReservoir(String queryId, QueryExecutionMetrics queryExecutionMetrics) { + if (!KylinConfig.getInstanceFromEnv().isKylinMetricsReporterForQueryEnabled()) { + return; + } + if (queryExecutionMetrics != null) { + RecordEvent queryExecutionMetricsEvent = new TimedRecordEvent( + KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryExecution()); + + setQueryWrapper(queryExecutionMetricsEvent, queryExecutionMetrics.getUser(), + queryExecutionMetrics.getSqlIdCode(), queryExecutionMetrics.getQueryType(), + queryId, queryExecutionMetrics.getProject(), + queryExecutionMetrics.getException()); + + setSparkExecutionWrapper(queryExecutionMetricsEvent, queryExecutionMetrics.getSparderName(), + queryExecutionMetrics.getExecutionId(), queryExecutionMetrics.getRealization(), + queryExecutionMetrics.getRealizationType(), queryExecutionMetrics.getCuboidIds(), + queryExecutionMetrics.getStartTime(), queryExecutionMetrics.getEndTime()); + + setQueryMetrics(queryExecutionMetricsEvent, queryExecutionMetrics.getSqlDuration(), + queryExecutionMetrics.getTotalScanCount(), queryExecutionMetrics.getTotalScanBytes(), + queryExecutionMetrics.getResultCount()); + + long[] queryExecutionMetricsList = new long[sparkMetricsNum]; + for (Map.Entry<Integer, QuerySparkMetrics.SparkJobMetrics> sparkJobMetricsEntry : queryExecutionMetrics + .getSparkJobMetricsMap().entrySet()) { + RecordEvent sparkJobMetricsEvent = new TimedRecordEvent( + KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQuerySparkJob()); + + setSparkJobWrapper(sparkJobMetricsEvent, queryExecutionMetrics.getProject(), + queryId, queryExecutionMetrics.getExecutionId(), + sparkJobMetricsEntry.getValue().getJobId(), sparkJobMetricsEntry.getValue().getStartTime(), + sparkJobMetricsEntry.getValue().getEndTime(), sparkJobMetricsEntry.getValue().isSuccess()); + + long[] sparkJobMetricsList = new long[sparkMetricsNum]; + for (Map.Entry<Integer, QuerySparkMetrics.SparkStageMetrics> sparkStageMetricsEntry : sparkJobMetricsEntry + .getValue().getSparkStageMetricsMap().entrySet()) { + RecordEvent sparkStageMetricsEvent = new TimedRecordEvent( + KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQuerySparkStage()); + QuerySparkMetrics.SparkStageMetrics sparkStageMetrics = sparkStageMetricsEntry.getValue(); + setStageWrapper(sparkStageMetricsEvent, queryExecutionMetrics.getProject(), null, + queryId, queryExecutionMetrics.getExecutionId(), + sparkJobMetricsEntry.getValue().getJobId(), sparkStageMetrics.getStageId(), + sparkStageMetrics.getSubmitTime(), sparkStageMetrics.isSuccess()); + setStageMetrics(sparkStageMetricsEvent, sparkStageMetrics.getResultSize(), + sparkStageMetrics.getExecutorDeserializeTime(), + sparkStageMetrics.getExecutorDeserializeCpuTime(), sparkStageMetrics.getExecutorRunTime(), + sparkStageMetrics.getExecutorCpuTime(), sparkStageMetrics.getJvmGCTime(), + sparkStageMetrics.getResultSerializationTime(), sparkStageMetrics.getMemoryBytesSpilled(), + sparkStageMetrics.getDiskBytesSpilled(), sparkStageMetrics.getPeakExecutionMemory()); + //Update spark stage level metrics + MetricsManager.getInstance().update(sparkStageMetricsEvent); + + sparkJobMetricsList[0] += sparkStageMetrics.getResultSize(); + sparkJobMetricsList[1] += sparkStageMetrics.getExecutorDeserializeTime(); + sparkJobMetricsList[2] += sparkStageMetrics.getExecutorDeserializeCpuTime(); + sparkJobMetricsList[3] += sparkStageMetrics.getExecutorRunTime(); + sparkJobMetricsList[4] += sparkStageMetrics.getExecutorCpuTime(); + sparkJobMetricsList[5] += sparkStageMetrics.getJvmGCTime(); + sparkJobMetricsList[6] += sparkStageMetrics.getResultSerializationTime(); + sparkJobMetricsList[7] += sparkStageMetrics.getMemoryBytesSpilled(); + sparkJobMetricsList[8] += sparkStageMetrics.getDiskBytesSpilled(); + sparkJobMetricsList[9] += sparkStageMetrics.getPeakExecutionMemory(); + } + setSparkJobMetrics(sparkJobMetricsEvent, sparkJobMetricsList[0], sparkJobMetricsList[1], + sparkJobMetricsList[2], sparkJobMetricsList[3], sparkJobMetricsList[4], sparkJobMetricsList[5], + sparkJobMetricsList[6], sparkJobMetricsList[7], sparkJobMetricsList[8], sparkJobMetricsList[9]); + //Update spark job level metrics + MetricsManager.getInstance().update(sparkJobMetricsEvent); + + for (int i = 0; i < sparkMetricsNum; i++) { + queryExecutionMetricsList[i] += sparkJobMetricsList[i]; + } + } + setSparkExecutionMetrics(queryExecutionMetricsEvent, queryExecutionMetrics.getExecutionDuration(), + queryExecutionMetricsList[0], queryExecutionMetricsList[1], queryExecutionMetricsList[2], + queryExecutionMetricsList[3], queryExecutionMetricsList[4], queryExecutionMetricsList[5], + queryExecutionMetricsList[6], queryExecutionMetricsList[7], queryExecutionMetricsList[8], + queryExecutionMetricsList[9]); + //Update execution level metrics + MetricsManager.getInstance().update(queryExecutionMetricsEvent); + } + } + + private static void setQueryWrapper(RecordEvent metricsEvent, String user, long sqlIdCode, String queryType, + String queryId, String project, String exception) { + metricsEvent.put(QuerySparkExecutionEnum.USER.toString(), user); + metricsEvent.put(QuerySparkExecutionEnum.ID_CODE.toString(), sqlIdCode); + metricsEvent.put(QuerySparkExecutionEnum.TYPE.toString(), queryType); + metricsEvent.put(QuerySparkExecutionEnum.QUERY_ID.toString(), queryId); + metricsEvent.put(QuerySparkExecutionEnum.PROJECT.toString(), project); + metricsEvent.put(QuerySparkExecutionEnum.EXCEPTION.toString(), exception); + } + + private static void setSparkExecutionWrapper(RecordEvent metricsEvent, String sparderName, long executionId, + String realizationName, int realizationType, String cuboidIds, long startTime, long endTime) { + metricsEvent.put(QuerySparkExecutionEnum.SPARDER_NAME.toString(), sparderName); + metricsEvent.put(QuerySparkExecutionEnum.EXECUTION_ID.toString(), executionId); + metricsEvent.put(QuerySparkExecutionEnum.REALIZATION.toString(), realizationName); + metricsEvent.put(QuerySparkExecutionEnum.REALIZATION_TYPE.toString(), realizationType); + metricsEvent.put(QuerySparkExecutionEnum.CUBOID_IDS.toString(), cuboidIds); + metricsEvent.put(QuerySparkExecutionEnum.START_TIME.toString(), startTime); + metricsEvent.put(QuerySparkExecutionEnum.END_TIME.toString(), endTime); + } + + private static void setQueryMetrics(RecordEvent metricsEvent, long sqlDuration, long totalScanCount, + long totalScanBytes, long resultCount) { + metricsEvent.put(QuerySparkExecutionEnum.TIME_COST.toString(), sqlDuration); + metricsEvent.put(QuerySparkExecutionEnum.TOTAL_SCAN_COUNT.toString(), totalScanCount); + metricsEvent.put(QuerySparkExecutionEnum.TOTAL_SCAN_BYTES.toString(), totalScanBytes); + metricsEvent.put(QuerySparkExecutionEnum.RESULT_COUNT.toString(), resultCount); + } + + private static void setSparkExecutionMetrics(RecordEvent metricsEvent, long executionDuration, long resultSize, + long executorDeserializeTime, long executorDeserializeCpuTime, long executorRunTime, long executorCpuTime, + long jvmGCTime, long resultSerializationTime, long memoryBytesSpilled, long diskBytesSpilled, + long peakExecutionMemory) { + metricsEvent.put(QuerySparkExecutionEnum.EXECUTION_DURATION.toString(), executionDuration); + + metricsEvent.put(QuerySparkExecutionEnum.RESULT_SIZE.toString(), resultSize); + metricsEvent.put(QuerySparkExecutionEnum.EXECUTOR_DESERIALIZE_TIME.toString(), executorDeserializeTime); + metricsEvent.put(QuerySparkExecutionEnum.EXECUTOR_DESERIALIZE_CPU_TIME.toString(), executorDeserializeCpuTime); + metricsEvent.put(QuerySparkExecutionEnum.EXECUTOR_RUN_TIME.toString(), executorRunTime); + metricsEvent.put(QuerySparkExecutionEnum.EXECUTOR_CPU_TIME.toString(), executorCpuTime); + metricsEvent.put(QuerySparkExecutionEnum.JVM_GC_TIME.toString(), jvmGCTime); + metricsEvent.put(QuerySparkExecutionEnum.RESULT_SERIALIZATION_TIME.toString(), resultSerializationTime); + metricsEvent.put(QuerySparkExecutionEnum.MEMORY_BYTE_SPILLED.toString(), memoryBytesSpilled); + metricsEvent.put(QuerySparkExecutionEnum.DISK_BYTES_SPILLED.toString(), diskBytesSpilled); + metricsEvent.put(QuerySparkExecutionEnum.PEAK_EXECUTION_MEMORY.toString(), peakExecutionMemory); + } + + private static void setSparkJobMetrics(RecordEvent metricsEvent, long resultSize, long executorDeserializeTime, + long executorDeserializeCpuTime, long executorRunTime, long executorCpuTime, long jvmGCTime, + long resultSerializationTime, long memoryBytesSpilled, long diskBytesSpilled, long peakExecutionMemory) { + metricsEvent.put(QuerySparkJobEnum.RESULT_SIZE.toString(), resultSize); + metricsEvent.put(QuerySparkJobEnum.EXECUTOR_DESERIALIZE_TIME.toString(), executorDeserializeTime); + metricsEvent.put(QuerySparkJobEnum.EXECUTOR_DESERIALIZE_CPU_TIME.toString(), executorDeserializeCpuTime); + metricsEvent.put(QuerySparkJobEnum.EXECUTOR_RUN_TIME.toString(), executorRunTime); + metricsEvent.put(QuerySparkJobEnum.EXECUTOR_CPU_TIME.toString(), executorCpuTime); + metricsEvent.put(QuerySparkJobEnum.JVM_GC_TIME.toString(), jvmGCTime); + metricsEvent.put(QuerySparkJobEnum.RESULT_SERIALIZATION_TIME.toString(), resultSerializationTime); + metricsEvent.put(QuerySparkJobEnum.MEMORY_BYTE_SPILLED.toString(), memoryBytesSpilled); + metricsEvent.put(QuerySparkJobEnum.DISK_BYTES_SPILLED.toString(), diskBytesSpilled); + metricsEvent.put(QuerySparkJobEnum.PEAK_EXECUTION_MEMORY.toString(), peakExecutionMemory); + } + + private static void setStageMetrics(RecordEvent metricsEvent, long resultSize, long executorDeserializeTime, + long executorDeserializeCpuTime, long executorRunTime, long executorCpuTime, long jvmGCTime, + long resultSerializationTime, long memoryBytesSpilled, long diskBytesSpilled, long peakExecutionMemory) { + metricsEvent.put(QuerySparkStageEnum.RESULT_SIZE.toString(), resultSize); + metricsEvent.put(QuerySparkStageEnum.EXECUTOR_DESERIALIZE_TIME.toString(), executorDeserializeTime); + metricsEvent.put(QuerySparkStageEnum.EXECUTOR_DESERIALIZE_CPU_TIME.toString(), executorDeserializeCpuTime); + metricsEvent.put(QuerySparkStageEnum.EXECUTOR_RUN_TIME.toString(), executorRunTime); + metricsEvent.put(QuerySparkStageEnum.EXECUTOR_CPU_TIME.toString(), executorCpuTime); + metricsEvent.put(QuerySparkStageEnum.JVM_GC_TIME.toString(), jvmGCTime); + metricsEvent.put(QuerySparkStageEnum.RESULT_SERIALIZATION_TIME.toString(), resultSerializationTime); + metricsEvent.put(QuerySparkStageEnum.MEMORY_BYTE_SPILLED.toString(), memoryBytesSpilled); + metricsEvent.put(QuerySparkStageEnum.DISK_BYTES_SPILLED.toString(), diskBytesSpilled); + metricsEvent.put(QuerySparkStageEnum.PEAK_EXECUTION_MEMORY.toString(), peakExecutionMemory); + } + + private static void setStageWrapper(RecordEvent metricsEvent, String projectName, String realizationName, + String queryId, long executionId, int jobId, int stageId, long submitTime, boolean isSuccess) { + metricsEvent.put(QuerySparkStageEnum.PROJECT.toString(), projectName); + metricsEvent.put(QuerySparkStageEnum.REALIZATION.toString(), realizationName); + metricsEvent.put(QuerySparkStageEnum.QUERY_ID.toString(), queryId); + metricsEvent.put(QuerySparkStageEnum.EXECUTION_ID.toString(), executionId); + metricsEvent.put(QuerySparkStageEnum.JOB_ID.toString(), jobId); + metricsEvent.put(QuerySparkStageEnum.STAGE_ID.toString(), stageId); + metricsEvent.put(QuerySparkStageEnum.SUBMIT_TIME.toString(), submitTime); + metricsEvent.put(QuerySparkStageEnum.IF_SUCCESS.toString(), isSuccess); + } + + private static void setSparkJobWrapper(RecordEvent metricsEvent, String projectName, String queryId, + long executionId, int jobId, long startTime, long endTime, boolean isSuccess) { + metricsEvent.put(QuerySparkJobEnum.PROJECT.toString(), projectName); + metricsEvent.put(QuerySparkJobEnum.QUERY_ID.toString(), queryId); + metricsEvent.put(QuerySparkJobEnum.EXECUTION_ID.toString(), executionId); + metricsEvent.put(QuerySparkJobEnum.JOB_ID.toString(), jobId); + metricsEvent.put(QuerySparkJobEnum.START_TIME.toString(), startTime); + metricsEvent.put(QuerySparkJobEnum.END_TIME.toString(), endTime); + metricsEvent.put(QuerySparkJobEnum.IF_SUCCESS.toString(), isSuccess); + } + + public static class QueryExecutionMetrics implements Serializable { + private long sqlIdCode; + private String user; + private String queryType; + private String project; + private String exception; + private long executionId; + private String sparderName; + private long executionDuration; + private String queryId; + private String realization; + private int realizationType; + private String cuboidIds; + private long startTime; + private long endTime; + private ConcurrentMap<Integer, SparkJobMetrics> sparkJobMetricsMap; + + private long sqlDuration; + private long totalScanCount; + private long totalScanBytes; + private int resultCount; + + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } + + public int getResultCount() { + return resultCount; + } + + public long getSqlDuration() { + return sqlDuration; + } + + public long getTotalScanBytes() { + return totalScanBytes; + } + + public long getTotalScanCount() { + return totalScanCount; + } + + public void setResultCount(int resultCount) { + this.resultCount = resultCount; + } + + public void setSqlDuration(long sqlDuration) { + this.sqlDuration = sqlDuration; + } + + public void setTotalScanBytes(long totalScanBytes) { + this.totalScanBytes = totalScanBytes; + } + + public void setTotalScanCount(long totalScanCount) { + this.totalScanCount = totalScanCount; + } + + public String getException() { + return exception; + } + + public void setException(String exception) { + this.exception = exception; + } + + public void setProject(String project) { + this.project = project; + } + + public String getProject() { + return project; + } + + public String getQueryType() { + return queryType; + } + + public long getSqlIdCode() { + return sqlIdCode; + } + + public void setQueryType(String queryType) { + this.queryType = queryType; + } + + public void setSqlIdCode(long sqlIdCode) { + this.sqlIdCode = sqlIdCode; + } + + public long getEndTime() { + return endTime; + } + + public long getStartTime() { + return startTime; + } + + public void setEndTime(long endTime) { + this.endTime = endTime; + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + public void setQueryId(String queryId) { + this.queryId = queryId; + } + + public String getQueryId() { + return queryId; + } + + public long getExecutionDuration() { + return executionDuration; + } + + public void setExecutionDuration(long executionDuration) { + this.executionDuration = executionDuration; + } + + public ConcurrentMap<Integer, SparkJobMetrics> getSparkJobMetricsMap() { + return sparkJobMetricsMap; + } + + public long getExecutionId() { + return executionId; + } + + public String getSparderName() { + return sparderName; + } + + public void setExecutionId(long executionId) { + this.executionId = executionId; + } + + public void setSparderName(String sparderName) { + this.sparderName = sparderName; + } + + public String getCuboidIds() { + return cuboidIds; + } + + public void setCuboidIds(String cuboidIds) { + this.cuboidIds = cuboidIds; + } + + public String getRealization() { + return realization; + } + + public int getRealizationType() { + return realizationType; + } + + public void setRealization(String realization) { + this.realization = realization; + } + + public void setRealizationType(int realizationType) { + this.realizationType = realizationType; + } + + public void setSparkJobMetricsMap(ConcurrentMap<Integer, SparkJobMetrics> sparkJobMetricsMap) { + this.sparkJobMetricsMap = sparkJobMetricsMap; + } + } + + public static class SparkJobMetrics implements Serializable { + private long executionId; + private int jobId; + private long startTime; + private long endTime; + private boolean isSuccess; + private ConcurrentMap<Integer, SparkStageMetrics> sparkStageMetricsMap; + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + public void setEndTime(long endTime) { + this.endTime = endTime; + } + + public long getStartTime() { + return startTime; + } + + public long getEndTime() { + return endTime; + } + + public void setExecutionId(long executionId) { + this.executionId = executionId; + } + + public long getExecutionId() { + return executionId; + } + + public void setSparkStageMetricsMap(ConcurrentMap<Integer, SparkStageMetrics> sparkStageMetricsMap) { + this.sparkStageMetricsMap = sparkStageMetricsMap; + } + + public void setJobId(int jobId) { + this.jobId = jobId; + } + + public void setSuccess(boolean success) { + isSuccess = success; + } + + public boolean isSuccess() { + return isSuccess; + } + + public ConcurrentMap<Integer, SparkStageMetrics> getSparkStageMetricsMap() { + return sparkStageMetricsMap; + } + + public int getJobId() { + return jobId; + } + } + + public static class SparkStageMetrics implements Serializable { + private int stageId; + private String stageType; + private long submitTime; + private long endTime; + private boolean isSuccess; + private long resultSize; + private long executorDeserializeTime; + private long executorDeserializeCpuTime; + private long executorRunTime; + private long executorCpuTime; + private long jvmGCTime; + private long resultSerializationTime; + private long memoryBytesSpilled; + private long diskBytesSpilled; + private long peakExecutionMemory; + + public void setMetrics(long resultSize, long executorDeserializeTime, long executorDeserializeCpuTime, + long executorRunTime, long executorCpuTime, long jvmGCTime, long resultSerializationTime, + long memoryBytesSpilled, long diskBytesSpilled, long peakExecutionMemory) { + this.resultSize = resultSize; + this.executorDeserializeTime = executorDeserializeTime; + this.executorDeserializeCpuTime = executorDeserializeCpuTime; + this.executorRunTime = executorRunTime; + this.executorCpuTime = executorCpuTime; + this.jvmGCTime = jvmGCTime; + this.resultSerializationTime = resultSerializationTime; + this.memoryBytesSpilled = memoryBytesSpilled; + this.diskBytesSpilled = diskBytesSpilled; + this.peakExecutionMemory = peakExecutionMemory; + } + + public long getEndTime() { + return endTime; + } + + public long getSubmitTime() { + return submitTime; + } + + public void setEndTime(long endTime) { + this.endTime = endTime; + } + + public void setSubmitTime(long submitTime) { + this.submitTime = submitTime; + } + + public boolean isSuccess() { + return isSuccess; + } + + public void setSuccess(boolean success) { + isSuccess = success; + } + + public void setStageType(String stageType) { + this.stageType = stageType; + } + + public void setStageId(int stageId) { + this.stageId = stageId; + } + + public void setResultSize(long resultSize) { + this.resultSize = resultSize; + } + + public void setResultSerializationTime(long resultSerializationTime) { + this.resultSerializationTime = resultSerializationTime; + } + + public void setPeakExecutionMemory(long peakExecutionMemory) { + this.peakExecutionMemory = peakExecutionMemory; + } + + public void setMemoryBytesSpilled(long memoryBytesSpilled) { + this.memoryBytesSpilled = memoryBytesSpilled; + } + + public void setJvmGCTime(long jvmGCTime) { + this.jvmGCTime = jvmGCTime; + } + + public void setExecutorRunTime(long executorRunTime) { + this.executorRunTime = executorRunTime; + } + + public void setExecutorDeserializeTime(long executorDeserializeTime) { + this.executorDeserializeTime = executorDeserializeTime; + } + + public void setExecutorDeserializeCpuTime(long executorDeserializeCpuTime) { + this.executorDeserializeCpuTime = executorDeserializeCpuTime; + } + + public void setExecutorCpuTime(long executorCpuTime) { + this.executorCpuTime = executorCpuTime; + } + + public void setDiskBytesSpilled(long diskBytesSpilled) { + this.diskBytesSpilled = diskBytesSpilled; + } + + public String getStageType() { + return stageType; + } + + public long getResultSize() { + return resultSize; + } + + public long getResultSerializationTime() { + return resultSerializationTime; + } + + public long getPeakExecutionMemory() { + return peakExecutionMemory; + } + + public long getMemoryBytesSpilled() { + return memoryBytesSpilled; + } + + public long getJvmGCTime() { + return jvmGCTime; + } + + public long getExecutorRunTime() { + return executorRunTime; + } + + public long getExecutorDeserializeTime() { + return executorDeserializeTime; + } + + public long getExecutorDeserializeCpuTime() { + return executorDeserializeCpuTime; + } + + public long getExecutorCpuTime() { + return executorCpuTime; + } + + public long getDiskBytesSpilled() { + return diskBytesSpilled; + } + + public int getStageId() { + return stageId; + } + } +} diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/property/QuerySparkExecutionEnum.java b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QuerySparkExecutionEnum.java new file mode 100644 index 0000000..b390dd0 --- /dev/null +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QuerySparkExecutionEnum.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.metrics.property; + +import org.apache.kylin.shaded.com.google.common.base.Strings; + +public enum QuerySparkExecutionEnum { + ID_CODE("QUERY_HASH_CODE"), + SQL("QUERY_SQL"), + PROJECT("PROJECT"), + TYPE("QUERY_TYPE"), + REALIZATION("REALIZATION"), + REALIZATION_TYPE("REALIZATION_TYPE"), + CUBOID_IDS("CUBOID_IDS"), + QUERY_ID("QUERY_ID"), + EXECUTION_ID("EXECUTION_ID"), + USER("KUSER"), + SPARDER_NAME("SPARDER_NAME"), + EXCEPTION("EXCEPTION"), + START_TIME("START_TIME"), + END_TIME("END_TIME"), + + + TIME_COST("QUERY_TIME_COST"), + TOTAL_SCAN_COUNT("TOTAL_SCAN_COUNT"), + TOTAL_SCAN_BYTES("TOTAL_SCAN_BYTES"), + RESULT_COUNT("RESULT_COUNT"), + + EXECUTION_DURATION("EXECUTION_DURATION"), + RESULT_SIZE("RESULT_SIZE"), + EXECUTOR_DESERIALIZE_TIME("EXECUTOR_DESERIALIZE_TIME"), + EXECUTOR_DESERIALIZE_CPU_TIME("EXECUTOR_DESERIALIZE_CPU_TIME"), + EXECUTOR_RUN_TIME("EXECUTOR_RUN_TIME"), + EXECUTOR_CPU_TIME("EXECUTOR_CPU_TIME"), + JVM_GC_TIME("JVM_GC_TIME"), + RESULT_SERIALIZATION_TIME("RESULT_SERIALIZATION_TIME"), + MEMORY_BYTE_SPILLED("MEMORY_BYTE_SPILLED"), + DISK_BYTES_SPILLED("DISK_BYTES_SPILLED"), + PEAK_EXECUTION_MEMORY("PEAK_EXECUTION_MEMORY"); + + private final String propertyName; + + QuerySparkExecutionEnum(String name) { + this.propertyName = name; + } + + public static QuerySparkExecutionEnum getByName(String name) { + if (Strings.isNullOrEmpty(name)) { + throw new IllegalArgumentException("Name should not be empty"); + } + for (QuerySparkExecutionEnum property : QuerySparkExecutionEnum.values()) { + if (property.propertyName.equalsIgnoreCase(name)) { + return property; + } + } + + return null; + } + + @Override + public String toString() { + return propertyName; + } +} diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/property/QuerySparkJobEnum.java b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QuerySparkJobEnum.java new file mode 100644 index 0000000..0f041c2 --- /dev/null +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QuerySparkJobEnum.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.metrics.property; + +import org.apache.kylin.shaded.com.google.common.base.Strings; + +public enum QuerySparkJobEnum { + PROJECT("PROJECT"), + QUERY_ID("QUERY_ID"), + EXECUTION_ID("EXECUTION_ID"), + JOB_ID("JOB_ID"), + START_TIME("START_TIME"), + END_TIME("END_TIME"), + IF_SUCCESS("IF_SUCCESS"), + + RESULT_SIZE("RESULT_SIZE"), + EXECUTOR_DESERIALIZE_TIME("EXECUTOR_DESERIALIZE_TIME"), + EXECUTOR_DESERIALIZE_CPU_TIME("EXECUTOR_DESERIALIZE_CPU_TIME"), + EXECUTOR_RUN_TIME("EXECUTOR_RUN_TIME"), + EXECUTOR_CPU_TIME("EXECUTOR_CPU_TIME"), + JVM_GC_TIME("JVM_GC_TIME"), + RESULT_SERIALIZATION_TIME("RESULT_SERIALIZATION_TIME"), + MEMORY_BYTE_SPILLED("MEMORY_BYTE_SPILLED"), + DISK_BYTES_SPILLED("DISK_BYTES_SPILLED"), + PEAK_EXECUTION_MEMORY("PEAK_EXECUTION_MEMORY"); + + private final String propertyName; + + QuerySparkJobEnum(String name) { + this.propertyName = name; + } + + public static QuerySparkJobEnum getByName(String name) { + if (Strings.isNullOrEmpty(name)) { + throw new IllegalArgumentException("Name should not be empty"); + } + for (QuerySparkJobEnum property : QuerySparkJobEnum.values()) { + if (property.propertyName.equalsIgnoreCase(name)) { + return property; + } + } + + return null; + } + + @Override + public String toString() { + return propertyName; + } +} diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/property/QuerySparkStageEnum.java b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QuerySparkStageEnum.java new file mode 100644 index 0000000..dcb0f42 --- /dev/null +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QuerySparkStageEnum.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.metrics.property; + +import org.apache.kylin.shaded.com.google.common.base.Strings; + +/** + * Definition of Metrics dimension and measure for Spark stage + */ +public enum QuerySparkStageEnum { + PROJECT("PROJECT"), + QUERY_ID("QUERY_ID"), + EXECUTION_ID("EXECUTION_ID"), + JOB_ID("JOB_ID"), + STAGE_ID("STAGE_ID"), + SUBMIT_TIME("SUBMIT_TIME"), + REALIZATION("REALIZATION"), + CUBOID_ID("CUBOID_NAME"), + IF_SUCCESS("IF_SUCCESS"), + + RESULT_SIZE("RESULT_SIZE"), + EXECUTOR_DESERIALIZE_TIME("EXECUTOR_DESERIALIZE_TIME"), + EXECUTOR_DESERIALIZE_CPU_TIME("EXECUTOR_DESERIALIZE_CPU_TIME"), + EXECUTOR_RUN_TIME("EXECUTOR_RUN_TIME"), + EXECUTOR_CPU_TIME("EXECUTOR_CPU_TIME"), + JVM_GC_TIME("JVM_GC_TIME"), + RESULT_SERIALIZATION_TIME("RESULT_SERIALIZATION_TIME"), + MEMORY_BYTE_SPILLED("MEMORY_BYTE_SPILLED"), + DISK_BYTES_SPILLED("DISK_BYTES_SPILLED"), + PEAK_EXECUTION_MEMORY("PEAK_EXECUTION_MEMORY"); + + private final String propertyName; + + QuerySparkStageEnum(String name) { + this.propertyName = name; + } + + public static QuerySparkStageEnum getByName(String name) { + if (Strings.isNullOrEmpty(name)) { + throw new IllegalArgumentException("Name should not be empty"); + } + for (QuerySparkStageEnum property : QuerySparkStageEnum.values()) { + if (property.propertyName.equalsIgnoreCase(name)) { + return property; + } + } + + return null; + } + + @Override + public String toString() { + return propertyName; + } +} diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala index 9e89a62..0d25dba 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala @@ -37,6 +37,7 @@ import org.apache.kylin.query.monitor.SparderContextCanary import org.apache.kylin.spark.classloader.ClassLoaderUtils import org.apache.spark.{SparkConf, SparkContext, SparkEnv} import org.apache.spark.sql.execution.datasource.KylinSourceStrategy +import org.apache.spark.sql.metrics.SparderMetricsListener import org.apache.spark.utils.YarnInfoFetcherUtils // scalastyle:off @@ -150,6 +151,11 @@ object SparderContext extends Logging { .enableHiveSupport() .getOrCreateKylinSession() } + if (kylinConf.isKylinMetricsReporterForQueryEnabled) { + val appStatusListener = new SparderMetricsListener() + sparkSession.sparkContext.addSparkListener(appStatusListener) + logInfo("Query metrics reporter is enabled, sparder metrics listener is added.") + } spark = sparkSession val appid = sparkSession.sparkContext.applicationId // write application id to file 'sparkappid' diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/SparderMetricsListener.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/SparderMetricsListener.scala new file mode 100644 index 0000000..6237235 --- /dev/null +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/SparderMetricsListener.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.metrics + +import org.apache.kylin.metrics.QuerySparkMetrics +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} + +class SparderMetricsListener() extends SparkListener with Logging { + + var stageJobMap: Map[Int, Int] = Map() + var jobExecutionMap: Map[Int, QueryInformation] = Map() + var executionInformationMap: Map[Long, ExecutionInformation] = Map() + + val queryExecutionMetrics = QuerySparkMetrics.getInstance() + + override def onJobStart(event: SparkListenerJobStart): Unit = { + val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY) + val sparderName = event.properties.getProperty("spark.app.name") + val kylinQueryId = event.properties.getProperty("kylin.query.id") + + if (executionIdString == null || kylinQueryId == null) { + logInfo(s"The job ${event.jobId} is not a query job.") + return + } + + val executionId = executionIdString.toLong + + if (executionInformationMap.apply(executionId).sparderName == null) { + val executionInformation = new ExecutionInformation(kylinQueryId, + executionInformationMap.apply(executionId).executionStartTime, sparderName) + executionInformationMap += (executionId -> executionInformation) + } + + jobExecutionMap += (event.jobId -> new QueryInformation(kylinQueryId, executionId)) + + val stages = event.stageInfos.iterator + while (stages.hasNext) { + val stage: StageInfo = stages.next() + stageJobMap += (stage.stageId -> event.jobId) + } + + queryExecutionMetrics.onJobStart(kylinQueryId, sparderName, executionId, + executionInformationMap.apply(executionId).executionStartTime, event.jobId, event.time) + } + + override def onJobEnd(event: SparkListenerJobEnd): Unit = { + if (jobExecutionMap.contains(event.jobId)) { + val isSuccess = event.jobResult match { + case JobSucceeded => true + case _ => false + } + queryExecutionMetrics.updateSparkJobMetrics(jobExecutionMap.apply(event.jobId).queryId, event.jobId, event.time, + isSuccess) + logInfo(s"The job ${event.jobId} has completed and the relevant metrics are updated to the cache") + jobExecutionMap -= event.jobId + } + } + + override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { + val queryId = event.properties.getProperty("kylin.query.id") + val stageId = event.stageInfo.stageId + + if (stageJobMap.contains(stageId)) { + val submitTime = event.stageInfo.submissionTime match { + case Some(x) => x + case None => -1 + } + queryExecutionMetrics.onSparkStageStart(queryId, stageJobMap.apply(stageId), stageId, event.stageInfo.name, submitTime) + } + } + + override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { + val stageInfo = event.stageInfo + if (stageJobMap.contains(stageInfo.stageId) && jobExecutionMap.contains(stageJobMap.apply(stageInfo.stageId))) { + val isSuccess = stageInfo.getStatusString match { + case "succeeded" => true + case _ => false + } + val stageMetrics = stageInfo.taskMetrics + val sparkStageMetrics = new QuerySparkMetrics.SparkStageMetrics + sparkStageMetrics.setMetrics(stageMetrics.resultSize, stageMetrics.executorDeserializeCpuTime, + stageMetrics.executorDeserializeCpuTime, stageMetrics.executorRunTime, stageMetrics.executorCpuTime, + stageMetrics.jvmGCTime, stageMetrics.resultSerializationTime, + stageMetrics.memoryBytesSpilled, stageMetrics.diskBytesSpilled, stageMetrics.peakExecutionMemory) + queryExecutionMetrics.updateSparkStageMetrics(jobExecutionMap.apply(stageJobMap.apply(stageInfo.stageId)).queryId, + stageJobMap.apply(stageInfo.stageId), stageInfo.stageId, isSuccess, sparkStageMetrics) + stageJobMap -= stageInfo.stageId + + logInfo(s"The stage ${event.stageInfo.stageId} has completed and the relevant metrics are updated to the cache") + } + } + + override def onOtherEvent(event: SparkListenerEvent): Unit = { + event match { + case e: SparkListenerSQLExecutionStart => onQueryExecutionStart(e) + case e: SparkListenerSQLExecutionEnd => onQueryExecutionEnd(e) + case _ => // Ignore + } + } + + private def onQueryExecutionStart(event: SparkListenerSQLExecutionStart): Unit = { + executionInformationMap += (event.executionId -> new ExecutionInformation(null, event.time, null)) + } + + private def onQueryExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = { + val executionInformation = executionInformationMap.apply(event.executionId) + queryExecutionMetrics.updateExecutionMetrics(executionInformation.queryId, event.time) + executionInformationMap -= event.executionId + logInfo(s"QueryExecution ${event.executionId} is completed at ${event.time} " + + s"and the relevant metrics are updated to the cache") + } +} + +// ============================ + +class ExecutionInformation( + var queryId: String, + var executionStartTime: Long, + var sparderName: String + ) + +class QueryInformation( + val queryId: String, + val executionId: Long + ) diff --git a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java index f51bf08..7a8f2d7 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java +++ b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java @@ -20,7 +20,6 @@ package org.apache.kylin.rest.metrics; import java.nio.charset.Charset; import java.util.Locale; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.concurrent.ThreadSafe; @@ -28,23 +27,15 @@ import javax.annotation.concurrent.ThreadSafe; import org.apache.hadoop.metrics2.MetricsException; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.QueryContext; -import org.apache.kylin.common.QueryContextFacade; -import org.apache.kylin.metrics.MetricsManager; -import org.apache.kylin.metrics.lib.impl.RecordEvent; -import org.apache.kylin.metrics.lib.impl.TimedRecordEvent; -import org.apache.kylin.metrics.property.QueryCubePropertyEnum; -import org.apache.kylin.metrics.property.QueryPropertyEnum; -import org.apache.kylin.metrics.property.QueryRPCPropertyEnum; -import org.apache.kylin.query.enumerator.OLAPQuery; +import org.apache.kylin.metrics.QuerySparkMetrics; import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.response.SQLResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.security.core.context.SecurityContextHolder; import org.apache.kylin.shaded.com.google.common.hash.HashFunction; import org.apache.kylin.shaded.com.google.common.hash.Hashing; +import org.springframework.security.core.context.SecurityContextHolder; /** * The entrance of metrics features. @@ -70,9 +61,9 @@ public class QueryMetricsFacade { return hashFunc.hashString(sql, Charset.forName("UTF-8")).asLong(); } - public static void updateMetrics(SQLRequest sqlRequest, SQLResponse sqlResponse) { + public static void updateMetrics(String queryId, SQLRequest sqlRequest, SQLResponse sqlResponse) { updateMetricsToLocal(sqlRequest, sqlResponse); - updateMetricsToReservoir(sqlRequest, sqlResponse); + updateMetricsToCache(queryId, sqlRequest, sqlResponse); } private static void updateMetricsToLocal(SQLRequest sqlRequest, SQLResponse sqlResponse) { @@ -89,71 +80,27 @@ public class QueryMetricsFacade { update(getQueryMetrics(cubeMetricName), sqlResponse); } - /** - * report query related metrics - */ - private static void updateMetricsToReservoir(SQLRequest sqlRequest, SQLResponse sqlResponse) { - if (!KylinConfig.getInstanceFromEnv().isKylinMetricsReporterForQueryEnabled()) { - return; - } + private static void updateMetricsToCache(String queryId, SQLRequest sqlRequest, SQLResponse sqlResponse) { String user = SecurityContextHolder.getContext().getAuthentication().getName(); if (user == null) { user = "unknown"; } - for (QueryContext.RPCStatistics entry : QueryContextFacade.current().getRpcStatisticsList()) { - RecordEvent rpcMetricsEvent = new TimedRecordEvent( - KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryRpcCall()); - setRPCWrapper(rpcMetricsEvent, // - norm(sqlRequest.getProject()), entry.getRealizationName(), entry.getRpcServer(), - entry.getException()); - setRPCStats(rpcMetricsEvent, // - entry.getCallTimeMs(), entry.getSkippedRows(), entry.getScannedRows(), entry.getReturnedRows(), - entry.getAggregatedRows()); - //For update rpc level related metrics - MetricsManager.getInstance().update(rpcMetricsEvent); - } - for (QueryContext.CubeSegmentStatisticsResult contextEntry : sqlResponse.getCubeSegmentStatisticsList()) { - RecordEvent queryMetricsEvent = new TimedRecordEvent( - KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQuery()); - setQueryWrapper(queryMetricsEvent, // - user, sqlRequest.getSql(), sqlResponse.isStorageCacheUsed() ? "CACHE" : contextEntry.getQueryType(), - norm(sqlRequest.getProject()), contextEntry.getRealization(), contextEntry.getRealizationType(), - sqlResponse.getThrowable()); - - long totalStorageReturnCount = 0L; - if (contextEntry.getQueryType().equalsIgnoreCase(OLAPQuery.EnumeratorTypeEnum.OLAP.name())) { - for (Map<String, QueryContext.CubeSegmentStatistics> cubeEntry : contextEntry.getCubeSegmentStatisticsMap() - .values()) { - for (QueryContext.CubeSegmentStatistics segmentEntry : cubeEntry.values()) { - RecordEvent cubeSegmentMetricsEvent = new TimedRecordEvent( - KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryCube()); - setCubeWrapper(cubeSegmentMetricsEvent, // - norm(sqlRequest.getProject()), segmentEntry.getCubeName(), segmentEntry.getSegmentName(), - segmentEntry.getSourceCuboidId(), segmentEntry.getTargetCuboidId(), - segmentEntry.getFilterMask()); - - setCubeStats(cubeSegmentMetricsEvent, // - segmentEntry.getCallCount(), segmentEntry.getCallTimeSum(), segmentEntry.getCallTimeMax(), - segmentEntry.getStorageSkippedRows(), segmentEntry.getStorageScannedRows(), - segmentEntry.getStorageReturnedRows(), segmentEntry.getStorageAggregatedRows(), - segmentEntry.isIfSuccess(), 1.0 / cubeEntry.size()); - - totalStorageReturnCount += segmentEntry.getStorageReturnedRows(); - //For update cube segment level related query metrics - MetricsManager.getInstance().update(cubeSegmentMetricsEvent); - } - } - } else { - if (!sqlResponse.getIsException()) { - totalStorageReturnCount = sqlResponse.getResults().size(); - } - } - setQueryStats(queryMetricsEvent, // - sqlResponse.getDuration(), sqlResponse.getResults() == null ? 0 : sqlResponse.getResults().size(), - totalStorageReturnCount); - //For update query level metrics - MetricsManager.getInstance().update(queryMetricsEvent); + QuerySparkMetrics.QueryExecutionMetrics queryExecutionMetrics = QuerySparkMetrics.getInstance() + .getQueryExecutionMetricsMap().getIfPresent(queryId); + if (queryExecutionMetrics != null) { + queryExecutionMetrics.setUser(user); + queryExecutionMetrics.setSqlIdCode(getSqlHashCode(sqlRequest.getSql())); + queryExecutionMetrics.setProject(norm(sqlRequest.getProject())); + queryExecutionMetrics.setQueryType(sqlResponse.isStorageCacheUsed() ? "CACHE" : "PARQUET"); + + queryExecutionMetrics.setSqlDuration(sqlResponse.getDuration()); + queryExecutionMetrics.setTotalScanCount(sqlResponse.getTotalScanCount()); + queryExecutionMetrics.setTotalScanBytes(sqlResponse.getTotalScanBytes()); + queryExecutionMetrics.setResultCount(sqlResponse.getResults() == null ? 0 : sqlResponse.getResults().size()); + + queryExecutionMetrics.setException(sqlResponse.getThrowable() == null ? "NULL" : + sqlResponse.getThrowable().getClass().getName()); } } @@ -161,77 +108,6 @@ public class QueryMetricsFacade { return project.toUpperCase(Locale.ROOT); } - private static void setRPCWrapper(RecordEvent metricsEvent, String projectName, String realizationName, - String rpcServer, Throwable throwable) { - metricsEvent.put(QueryRPCPropertyEnum.PROJECT.toString(), projectName); - metricsEvent.put(QueryRPCPropertyEnum.REALIZATION.toString(), realizationName); - metricsEvent.put(QueryRPCPropertyEnum.RPC_SERVER.toString(), rpcServer); - metricsEvent.put(QueryRPCPropertyEnum.EXCEPTION.toString(), - throwable == null ? "NULL" : throwable.getClass().getName()); - } - - private static void setRPCStats(RecordEvent metricsEvent, long callTimeMs, long skipCount, long scanCount, - long returnCount, long aggrCount) { - metricsEvent.put(QueryRPCPropertyEnum.CALL_TIME.toString(), callTimeMs); - metricsEvent.put(QueryRPCPropertyEnum.SKIP_COUNT.toString(), skipCount); //Number of skips on region servers based on region meta or fuzzy filter - metricsEvent.put(QueryRPCPropertyEnum.SCAN_COUNT.toString(), scanCount); //Count scanned by region server - metricsEvent.put(QueryRPCPropertyEnum.RETURN_COUNT.toString(), returnCount);//Count returned by region server - metricsEvent.put(QueryRPCPropertyEnum.AGGR_FILTER_COUNT.toString(), scanCount - returnCount); //Count filtered & aggregated by coprocessor - metricsEvent.put(QueryRPCPropertyEnum.AGGR_COUNT.toString(), aggrCount); //Count aggregated by coprocessor - } - - private static void setCubeWrapper(RecordEvent metricsEvent, String projectName, String cubeName, - String segmentName, long sourceCuboidId, long targetCuboidId, long filterMask) { - metricsEvent.put(QueryCubePropertyEnum.PROJECT.toString(), projectName); - metricsEvent.put(QueryCubePropertyEnum.CUBE.toString(), cubeName); - metricsEvent.put(QueryCubePropertyEnum.SEGMENT.toString(), segmentName); - metricsEvent.put(QueryCubePropertyEnum.CUBOID_SOURCE.toString(), sourceCuboidId); - metricsEvent.put(QueryCubePropertyEnum.CUBOID_TARGET.toString(), targetCuboidId); - metricsEvent.put(QueryCubePropertyEnum.IF_MATCH.toString(), sourceCuboidId == targetCuboidId); - metricsEvent.put(QueryCubePropertyEnum.FILTER_MASK.toString(), filterMask); - } - - private static void setCubeStats(RecordEvent metricsEvent, long callCount, long callTimeSum, long callTimeMax, - long skipCount, long scanCount, long returnCount, long aggrCount, boolean ifSuccess, double weightPerHit) { - metricsEvent.put(QueryCubePropertyEnum.CALL_COUNT.toString(), callCount); - metricsEvent.put(QueryCubePropertyEnum.TIME_SUM.toString(), callTimeSum); - metricsEvent.put(QueryCubePropertyEnum.TIME_MAX.toString(), callTimeMax); - metricsEvent.put(QueryCubePropertyEnum.SKIP_COUNT.toString(), skipCount); - metricsEvent.put(QueryCubePropertyEnum.SCAN_COUNT.toString(), scanCount); - metricsEvent.put(QueryCubePropertyEnum.RETURN_COUNT.toString(), returnCount); - metricsEvent.put(QueryCubePropertyEnum.AGGR_FILTER_COUNT.toString(), scanCount - returnCount); - metricsEvent.put(QueryCubePropertyEnum.AGGR_COUNT.toString(), aggrCount); - metricsEvent.put(QueryCubePropertyEnum.IF_SUCCESS.toString(), ifSuccess); - metricsEvent.put(QueryCubePropertyEnum.WEIGHT_PER_HIT.toString(), weightPerHit); - } - - private static void setQueryWrapper(RecordEvent metricsEvent, String user, String sql, String queryType, - String projectName, String realizationName, int realizationType, Throwable throwable) { - metricsEvent.put(QueryPropertyEnum.USER.toString(), user); - metricsEvent.put(QueryPropertyEnum.ID_CODE.toString(), getSqlHashCode(sql)); - metricsEvent.put(QueryPropertyEnum.SQL.toString(), sql); - metricsEvent.put(QueryPropertyEnum.TYPE.toString(), queryType); - metricsEvent.put(QueryPropertyEnum.PROJECT.toString(), projectName); - metricsEvent.put(QueryPropertyEnum.REALIZATION.toString(), realizationName); - metricsEvent.put(QueryPropertyEnum.REALIZATION_TYPE.toString(), realizationType); - metricsEvent.put(QueryPropertyEnum.EXCEPTION.toString(), - throwable == null ? "NULL" : throwable.getClass().getName()); - } - - private static void setQueryStats(RecordEvent metricsEvent, long callTimeMs, long returnCountByCalcite, - long returnCountByStorage) { - metricsEvent.put(QueryPropertyEnum.TIME_COST.toString(), callTimeMs); - metricsEvent.put(QueryPropertyEnum.CALCITE_RETURN_COUNT.toString(), returnCountByCalcite); - metricsEvent.put(QueryPropertyEnum.STORAGE_RETURN_COUNT.toString(), returnCountByStorage); - long countAggrAndFilter = returnCountByStorage - returnCountByCalcite; - if (countAggrAndFilter < 0) { - countAggrAndFilter = 0; - logger.warn(returnCountByStorage + " rows returned by storage less than " + returnCountByCalcite - + " rows returned by calcite"); - } - metricsEvent.put(QueryPropertyEnum.AGGR_FILTER_COUNT.toString(), countAggrAndFilter); - } - private static void update(QueryMetrics queryMetrics, SQLResponse sqlResponse) { try { incrQueryCount(queryMetrics, sqlResponse); diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java index 63cb4d8..6b60ab8 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -1042,7 +1042,7 @@ public class CubeService extends BasicService implements InitializingBean { String cuboidColumn = isCuboidSource ? QueryCubePropertyEnum.CUBOID_SOURCE.toString() : QueryCubePropertyEnum.CUBOID_TARGET.toString(); String hitMeasure = QueryCubePropertyEnum.WEIGHT_PER_HIT.toString(); - String table = getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQueryCube()); + String table = getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQuerySparkJob()); String sql = "select " + cuboidColumn + ", sum(" + hitMeasure + ")" // + " from " + table// + " where " + QueryCubePropertyEnum.CUBE.toString() + " = ?" // @@ -1057,7 +1057,7 @@ public class CubeService extends BasicService implements InitializingBean { String cuboidTgt = QueryCubePropertyEnum.CUBOID_TARGET.toString(); String aggCount = QueryCubePropertyEnum.AGGR_COUNT.toString(); String returnCount = QueryCubePropertyEnum.RETURN_COUNT.toString(); - String table = getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQueryCube()); + String table = getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQuerySparkJob()); String sql = "select " + cuboidSource + ", " + cuboidTgt + ", avg(" + aggCount + "), avg(" + returnCount + ")"// + " from " + table // + " where " + QueryCubePropertyEnum.CUBE.toString() + " = ?" // @@ -1070,7 +1070,7 @@ public class CubeService extends BasicService implements InitializingBean { public Map<Long, Long> getCuboidQueryMatchCount(String cubeName) { String cuboidSource = QueryCubePropertyEnum.CUBOID_SOURCE.toString(); String hitMeasure = QueryCubePropertyEnum.WEIGHT_PER_HIT.toString(); - String table = getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQueryCube()); + String table = getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQuerySparkJob()); String sql = "select " + cuboidSource + ", sum(" + hitMeasure + ")" // + " from " + table // + " where " + QueryCubePropertyEnum.CUBE.toString() + " = ?" // diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java b/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java index f622b89..768876f 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java @@ -31,7 +31,8 @@ import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.metrics.MetricsManager; import org.apache.kylin.metrics.lib.impl.TimePropertyEnum; import org.apache.kylin.metrics.property.JobPropertyEnum; -import org.apache.kylin.metrics.property.QueryPropertyEnum; +import org.apache.kylin.metrics.property.QuerySparkExecutionEnum; +import org.apache.kylin.metrics.property.QuerySparkExecutionEnum; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.exception.BadRequestException; import org.apache.kylin.rest.request.PrepareSqlRequest; @@ -119,7 +120,7 @@ public class DashboardService extends BasicService { Map<String, String> filterMap = getBaseFilterMap(CategoryEnum.QUERY, projectName, startTime, endTime); filterMap.putAll(getCubeFilterMap(CategoryEnum.QUERY, cubeName)); return createPrepareSqlRequest(null, metrics, - getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQuery()), filterMap); + getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQueryExecution()), filterMap); }; public PrepareSqlRequest getJobMetricsSQLRequest(String startTime, String endTime, String projectName, @@ -143,7 +144,7 @@ public class DashboardService extends BasicService { if (categoryEnum == CategoryEnum.QUERY) { dimensionSQL = new String[] { QueryDimensionEnum.valueOf(dimension).toSQL() }; metricSQL = new String[] { QueryMetricEnum.valueOf(metric).toSQL() }; - table = getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQuery()); + table = getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQueryExecution()); } else if (categoryEnum == CategoryEnum.JOB) { dimensionSQL = new String[] { JobDimensionEnum.valueOf(dimension).toSQL() }; metricSQL = new String[] { JobMetricEnum.valueOf(metric).toSQL() }; @@ -217,10 +218,10 @@ public class DashboardService extends BasicService { HashMap<String, String> filterMap = new HashMap<>(); if (category == CategoryEnum.QUERY) { - filterMap.put(QueryPropertyEnum.EXCEPTION.toString() + " = ?", "NULL"); + filterMap.put(QuerySparkExecutionEnum.EXCEPTION.toString() + " = ?", "NULL"); if (!Strings.isNullOrEmpty(cubeName)) { - filterMap.put(QueryPropertyEnum.REALIZATION + " = ?", cubeName); + filterMap.put(QuerySparkExecutionEnum.REALIZATION + " = ?", cubeName); } } else if (category == CategoryEnum.JOB && !Strings.isNullOrEmpty(cubeName)) { HybridInstance hybridInstance = getHybridManager().getHybridInstance(cubeName); @@ -299,8 +300,8 @@ public class DashboardService extends BasicService { } private enum QueryDimensionEnum { - PROJECT(QueryPropertyEnum.PROJECT.toString()), // - CUBE(QueryPropertyEnum.REALIZATION.toString()), // + PROJECT(QuerySparkExecutionEnum.PROJECT.toString()), // + CUBE(QuerySparkExecutionEnum.REALIZATION.toString()), // DAY(TimePropertyEnum.DAY_DATE.toString()), // WEEK(TimePropertyEnum.WEEK_BEGIN_DATE.toString()), // MONTH(TimePropertyEnum.MONTH.toString()); @@ -336,9 +337,9 @@ public class DashboardService extends BasicService { private enum QueryMetricEnum { QUERY_COUNT("count(*)"), // - AVG_QUERY_LATENCY("avg(" + QueryPropertyEnum.TIME_COST.toString() + ")"), // - MAX_QUERY_LATENCY("max(" + QueryPropertyEnum.TIME_COST.toString() + ")"), // - MIN_QUERY_LATENCY("min(" + QueryPropertyEnum.TIME_COST.toString() + ")"); + AVG_QUERY_LATENCY("avg(" + QuerySparkExecutionEnum.TIME_COST.toString() + ")"), // + MAX_QUERY_LATENCY("max(" + QuerySparkExecutionEnum.TIME_COST.toString() + ")"), // + MIN_QUERY_LATENCY("min(" + QuerySparkExecutionEnum.TIME_COST.toString() + ")"); private final String sql; diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 855174b..f020d6b 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -68,6 +68,7 @@ import org.apache.kylin.cache.cachemanager.MemcachedCacheManager; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.QueryContextFacade; +import org.apache.kylin.metrics.QuerySparkMetrics; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.exceptions.ResourceLimitExceededException; import org.apache.kylin.common.persistence.ResourceStore; @@ -120,6 +121,7 @@ import org.apache.kylin.rest.util.SQLResponseSignatureUtil; import org.apache.kylin.rest.util.TableauInterceptor; import org.apache.kylin.storage.hybrid.HybridInstance; import org.apache.kylin.storage.hybrid.HybridManager; +import org.apache.spark.sql.SparderContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -468,9 +470,14 @@ public class QueryService extends BasicService { } sqlResponse.setDuration(queryContext.getAccumulatedMillis()); + if (QuerySparkMetrics.getInstance().getQueryExecutionMetrics(queryContext.getQueryId()) != null) { + String sqlTraceUrl = SparderContext.appMasterTrackURL() + "/SQL/execution/?id=" + + QuerySparkMetrics.getInstance().getQueryExecutionMetrics(queryContext.getQueryId()).getExecutionId(); + sqlResponse.setTraceUrl(sqlTraceUrl); + } logQuery(queryContext.getQueryId(), sqlRequest, sqlResponse); try { - recordMetric(sqlRequest, sqlResponse); + recordMetric(queryContext.getQueryId(), sqlRequest, sqlResponse); } catch (Throwable th) { logger.warn("Write metric error.", th); } @@ -585,8 +592,8 @@ public class QueryService extends BasicService { checkCondition(!BackdoorToggles.getDisableCache(), "query cache disabled in BackdoorToggles"); } - protected void recordMetric(SQLRequest sqlRequest, SQLResponse sqlResponse) throws UnknownHostException { - QueryMetricsFacade.updateMetrics(sqlRequest, sqlResponse); + protected void recordMetric(String queryId, SQLRequest sqlRequest, SQLResponse sqlResponse) throws UnknownHostException { + QueryMetricsFacade.updateMetrics(queryId, sqlRequest, sqlResponse); QueryMetrics2Facade.updateMetrics(sqlRequest, sqlResponse); } @@ -1200,7 +1207,8 @@ public class QueryService extends BasicService { realizations.add(realizationName); } - queryContext.setContextRealization(ctx.id, realizationName, realizationType); + QuerySparkMetrics.getInstance().setQueryRealization(queryContext.getQueryId(), realizationName, + realizationType, cuboidIdsSb.toString()); } diff --git a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java index 10e5d8b..fbacc78 100644 --- a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java +++ b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java @@ -71,7 +71,7 @@ public class QueryMetricsTest extends ServiceTestBase { sqlResponse.setResults(results); sqlResponse.setStorageCacheUsed(true); - QueryMetricsFacade.updateMetrics(sqlRequest, sqlResponse); + QueryMetricsFacade.updateMetrics("", sqlRequest, sqlResponse); Thread.sleep(2000); @@ -100,7 +100,7 @@ public class QueryMetricsTest extends ServiceTestBase { sqlResponse2.setCube("test_cube"); sqlResponse2.setIsException(true); - QueryMetricsFacade.updateMetrics(sqlRequest, sqlResponse2); + QueryMetricsFacade.updateMetrics("", sqlRequest, sqlResponse2); Thread.sleep(2000); @@ -146,7 +146,7 @@ public class QueryMetricsTest extends ServiceTestBase { sqlResponse.setCubeSegmentStatisticsList(context.getCubeSegmentStatisticsResultList()); - QueryMetricsFacade.updateMetrics(sqlRequest, sqlResponse); + QueryMetricsFacade.updateMetrics("", sqlRequest, sqlResponse); Thread.sleep(2000); diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java index 729dabb..2d82e02 100644 --- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java +++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java @@ -43,10 +43,9 @@ import org.apache.kylin.metadata.model.ParameterDesc; import org.apache.kylin.metrics.lib.impl.RecordEvent; import org.apache.kylin.metrics.lib.impl.TimePropertyEnum; import org.apache.kylin.metrics.property.JobPropertyEnum; -import org.apache.kylin.metrics.property.QueryCubePropertyEnum; -import org.apache.kylin.metrics.property.QueryPropertyEnum; -import org.apache.kylin.metrics.property.QueryRPCPropertyEnum; - +import org.apache.kylin.metrics.property.QuerySparkExecutionEnum; +import org.apache.kylin.metrics.property.QuerySparkJobEnum; +import org.apache.kylin.metrics.property.QuerySparkStageEnum; import org.apache.kylin.shaded.com.google.common.collect.Lists; import org.apache.kylin.shaded.com.google.common.collect.Maps; import org.apache.kylin.shaded.com.google.common.collect.Sets; @@ -54,11 +53,11 @@ import org.apache.kylin.tool.metrics.systemcube.def.MetricsSinkDesc; public class CubeDescCreator { - public static CubeDesc generateKylinCubeDescForMetricsQuery(KylinConfig config, MetricsSinkDesc sinkDesc) { - String tableName = sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQuery()); + public static CubeDesc generateKylinCubeDescForMetricsQueryExecution(KylinConfig config, MetricsSinkDesc sinkDesc) { + String tableName = sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQueryExecution()); //Set for dimensions - List<String> dimensions = ModelCreator.getDimensionsForMetricsQuery(); + List<String> dimensions = ModelCreator.getDimensionsForMetricsQueryExecution(); dimensions.remove(TimePropertyEnum.DAY_TIME.toString()); dimensions.remove(RecordEvent.RecordReserveKeyEnum.TIME.toString()); @@ -68,39 +67,49 @@ public class CubeDescCreator { } //Set for measures - List<String> measures = ModelCreator.getMeasuresForMetricsQuery(); - measures.remove(QueryPropertyEnum.ID_CODE.toString()); + List<String> measures = ModelCreator.getMeasuresForMetricsQueryExecution(); + measures.remove(QuerySparkExecutionEnum.ID_CODE.toString()); List<MeasureDesc> measureDescList = Lists.newArrayListWithExpectedSize(measures.size() * 2 + 1 + 1); - List<Pair<String, String>> measureTypeList = HiveTableCreator.getHiveColumnsForMetricsQuery(); + List<Pair<String, String>> measureTypeList = HiveTableCreator.getHiveColumnsForMetricsQueryExecution(); Map<String, String> measureTypeMap = Maps.newHashMapWithExpectedSize(measureTypeList.size()); for (Pair<String, String> entry : measureTypeList) { measureTypeMap.put(entry.getFirst(), entry.getSecond()); } measureDescList.add(getMeasureCount()); - measureDescList.add(getMeasureMin(QueryPropertyEnum.TIME_COST.toString(), - measureTypeMap.get(QueryPropertyEnum.TIME_COST.toString()))); + measureDescList.add(getMeasureMin(QuerySparkExecutionEnum.TIME_COST.toString(), + measureTypeMap.get(QuerySparkExecutionEnum.TIME_COST.toString()))); for (String measure : measures) { measureDescList.add(getMeasureSum(measure, measureTypeMap.get(measure))); measureDescList.add(getMeasureMax(measure, measureTypeMap.get(measure))); } - measureDescList.add(getMeasureHLL(QueryPropertyEnum.ID_CODE.toString())); - measureDescList.add(getMeasurePercentile(QueryPropertyEnum.TIME_COST.toString())); + measureDescList.add(getMeasureHLL(QuerySparkExecutionEnum.ID_CODE.toString())); + measureDescList.add(getMeasurePercentile(QuerySparkExecutionEnum.TIME_COST.toString())); //Set for row key RowKeyColDesc[] rowKeyColDescs = new RowKeyColDesc[dimensionDescList.size()]; int idx = getTimeRowKeyColDesc(tableName, rowKeyColDescs); - rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.USER.toString(), idx + 1); + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkExecutionEnum.USER.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkExecutionEnum.PROJECT.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkExecutionEnum.REALIZATION.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkExecutionEnum.REALIZATION_TYPE.toString(), idx + 1); idx++; - rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.PROJECT.toString(), idx + 1); + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkExecutionEnum.CUBOID_IDS.toString(), idx + 1); idx++; - rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.REALIZATION.toString(), idx + 1); + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkExecutionEnum.EXCEPTION.toString(), idx + 1); idx++; - rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.REALIZATION_TYPE.toString(), idx + 1); + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkExecutionEnum.TYPE.toString(), idx + 1); idx++; - rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.EXCEPTION.toString(), idx + 1); + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkExecutionEnum.SPARDER_NAME.toString(), idx + 1); idx++; - rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.TYPE.toString(), idx + 1); + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkExecutionEnum.QUERY_ID.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkExecutionEnum.START_TIME.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkExecutionEnum.END_TIME.toString(), idx + 1); idx++; rowKeyColDescs[idx] = getRowKeyColDesc(tableName, RecordEvent.RecordReserveKeyEnum.HOST.toString(), idx + 1); idx++; @@ -109,11 +118,18 @@ public class CubeDescCreator { rowKeyDesc.setRowkeyColumns(rowKeyColDescs); //Set for aggregation group - String[][] hierarchy_dims = new String[2][]; + String[][] hierarchy_dims = new String[4][]; hierarchy_dims[0] = getTimeHierarchy(); - hierarchy_dims[1] = new String[2]; - hierarchy_dims[1][0] = QueryPropertyEnum.REALIZATION_TYPE.toString(); - hierarchy_dims[1][1] = QueryPropertyEnum.REALIZATION.toString(); + hierarchy_dims[1] = new String[3]; + hierarchy_dims[1][0] = QuerySparkExecutionEnum.REALIZATION_TYPE.toString(); + hierarchy_dims[1][1] = QuerySparkExecutionEnum.REALIZATION.toString(); + hierarchy_dims[1][2] = QuerySparkExecutionEnum.CUBOID_IDS.toString(); + hierarchy_dims[2] = new String[2]; + hierarchy_dims[2][0] = QuerySparkExecutionEnum.START_TIME.toString(); + hierarchy_dims[2][1] = QuerySparkExecutionEnum.END_TIME.toString(); + hierarchy_dims[3] = new String[2]; + hierarchy_dims[3][0] = QuerySparkExecutionEnum.SPARDER_NAME.toString(); + hierarchy_dims[3][1] = RecordEvent.RecordReserveKeyEnum.HOST.toString(); for (int i = 0; i < hierarchy_dims.length; i++) { hierarchy_dims[i] = refineColumnWithTable(tableName, hierarchy_dims[i]); } @@ -135,15 +151,15 @@ public class CubeDescCreator { rowKeyDesc, aggGroup, hBaseMapping, sinkDesc.getCubeDescOverrideProperties()); } - public static CubeDesc generateKylinCubeDescForMetricsQueryCube(KylinConfig config, MetricsSinkDesc sinkDesc) { - String tableName = sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQueryCube()); + public static CubeDesc generateKylinCubeDescForMetricsQuerySparkJob(KylinConfig config, MetricsSinkDesc sinkDesc) { + String tableName = sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQuerySparkJob()); //Set for dimensions - List<String> dimensions = ModelCreator.getDimensionsForMetricsQueryCube(); + List<String> dimensions = ModelCreator.getDimensionsForMetricsQuerySparkJob(); dimensions.remove(TimePropertyEnum.DAY_TIME.toString()); dimensions.remove(RecordEvent.RecordReserveKeyEnum.TIME.toString()); dimensions.remove(RecordEvent.RecordReserveKeyEnum.HOST.toString()); - dimensions.remove(QueryCubePropertyEnum.PROJECT.toString()); + dimensions.remove(QuerySparkJobEnum.PROJECT.toString()); List<DimensionDesc> dimensionDescList = Lists.newArrayListWithExpectedSize(dimensions.size()); for (String dimensionName : dimensions) { @@ -151,10 +167,10 @@ public class CubeDescCreator { } //Set for measures - List<String> measures = ModelCreator.getMeasuresForMetricsQueryCube(); + List<String> measures = ModelCreator.getMeasuresForMetricsQuerySparkJob(); List<MeasureDesc> measureDescList = Lists.newArrayListWithExpectedSize(measures.size() * 2); - List<Pair<String, String>> measureTypeList = HiveTableCreator.getHiveColumnsForMetricsQueryCube(); + List<Pair<String, String>> measureTypeList = HiveTableCreator.getHiveColumnsForMetricsQuerySparkJob(); Map<String, String> measureTypeMap = Maps.newHashMapWithExpectedSize(measureTypeList.size()); for (Pair<String, String> entry : measureTypeList) { measureTypeMap.put(entry.getFirst(), entry.getSecond()); @@ -162,53 +178,43 @@ public class CubeDescCreator { measureDescList.add(getMeasureCount()); for (String measure : measures) { measureDescList.add(getMeasureSum(measure, measureTypeMap.get(measure))); - if (!measure.equals(QueryCubePropertyEnum.WEIGHT_PER_HIT.toString())) { - measureDescList.add(getMeasureMax(measure, measureTypeMap.get(measure))); - } + measureDescList.add(getMeasureMax(measure, measureTypeMap.get(measure))); } //Set for row key RowKeyColDesc[] rowKeyColDescs = new RowKeyColDesc[dimensionDescList.size()]; int idx = getTimeRowKeyColDesc(tableName, rowKeyColDescs); - rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.CUBE.toString(), idx + 1); + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkJobEnum.JOB_ID.toString(), idx + 1); idx++; - rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.SEGMENT.toString(), idx + 1); + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkJobEnum.EXECUTION_ID.toString(), idx + 1); idx++; - rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.CUBOID_SOURCE.toString(), idx + 1); + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkJobEnum.QUERY_ID.toString(), idx + 1); idx++; - rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.CUBOID_TARGET.toString(), idx + 1); + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkJobEnum.START_TIME.toString(), idx + 1); idx++; - rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.FILTER_MASK.toString(), idx + 1); + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkJobEnum.END_TIME.toString(), idx + 1); idx++; - rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.IF_MATCH.toString(), idx + 1); - idx++; - rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.IF_SUCCESS.toString(), idx + 1); + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkJobEnum.IF_SUCCESS.toString(), idx + 1); idx++; RowKeyDesc rowKeyDesc = new RowKeyDesc(); rowKeyDesc.setRowkeyColumns(rowKeyColDescs); - //Set for aggregation group - String[] mandatory_dims = new String[] { QueryCubePropertyEnum.CUBE.toString() }; - mandatory_dims = refineColumnWithTable(tableName, mandatory_dims); - - String[][] hierarchy_dims = new String[1][]; + String[][] hierarchy_dims = new String[2][]; hierarchy_dims[0] = getTimeHierarchy(); + hierarchy_dims[1] = new String[3]; + hierarchy_dims[1][0] = QuerySparkJobEnum.QUERY_ID.toString(); + hierarchy_dims[1][1] = QuerySparkJobEnum.EXECUTION_ID.toString(); + hierarchy_dims[1][2] = QuerySparkJobEnum.JOB_ID.toString(); + for (int i = 0; i < hierarchy_dims.length; i++) { hierarchy_dims[i] = refineColumnWithTable(tableName, hierarchy_dims[i]); } - String[][] joint_dims = new String[1][]; - joint_dims[0] = new String[] { QueryCubePropertyEnum.CUBOID_SOURCE.toString(), - QueryCubePropertyEnum.CUBOID_TARGET.toString() }; - for (int i = 0; i < joint_dims.length; i++) { - joint_dims[i] = refineColumnWithTable(tableName, joint_dims[i]); - } - SelectRule selectRule = new SelectRule(); - selectRule.mandatoryDims = mandatory_dims; + selectRule.mandatoryDims = new String[0]; selectRule.hierarchyDims = hierarchy_dims; - selectRule.jointDims = joint_dims; + selectRule.jointDims = new String[0][0]; AggregationGroup aggGroup = new AggregationGroup(); aggGroup.setIncludes(refineColumnWithTable(tableName, dimensions)); @@ -222,11 +228,11 @@ public class CubeDescCreator { rowKeyDesc, aggGroup, hBaseMapping, sinkDesc.getCubeDescOverrideProperties()); } - public static CubeDesc generateKylinCubeDescForMetricsQueryRPC(KylinConfig config, MetricsSinkDesc sinkDesc) { - String tableName = sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQueryRpcCall()); + public static CubeDesc generateKylinCubeDescForMetricsQuerySparkStage(KylinConfig config, MetricsSinkDesc sinkDesc) { + String tableName = sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQuerySparkStage()); //Set for dimensions - List<String> dimensions = ModelCreator.getDimensionsForMetricsQueryRPC(); + List<String> dimensions = ModelCreator.getDimensionsForMetricsQuerySparkStage(); dimensions.remove(TimePropertyEnum.DAY_TIME.toString()); dimensions.remove(RecordEvent.RecordReserveKeyEnum.TIME.toString()); @@ -236,10 +242,10 @@ public class CubeDescCreator { } //Set for measures - List<String> measures = ModelCreator.getMeasuresForMetricsQueryRPC(); + List<String> measures = ModelCreator.getMeasuresForMetricsQuerySparkStage(); List<MeasureDesc> measureDescList = Lists.newArrayListWithExpectedSize(measures.size() * 2 + 1 + 1); - List<Pair<String, String>> measureTypeList = HiveTableCreator.getHiveColumnsForMetricsQueryRPC(); + List<Pair<String, String>> measureTypeList = HiveTableCreator.getHiveColumnsForMetricsQuerySparkStage(); Map<String, String> measureTypeMap = Maps.newHashMapWithExpectedSize(measureTypeList.size()); for (Pair<String, String> entry : measureTypeList) { measureTypeMap.put(entry.getFirst(), entry.getSecond()); @@ -249,28 +255,42 @@ public class CubeDescCreator { measureDescList.add(getMeasureSum(measure, measureTypeMap.get(measure))); measureDescList.add(getMeasureMax(measure, measureTypeMap.get(measure))); } - measureDescList.add(getMeasurePercentile(QueryRPCPropertyEnum.CALL_TIME.toString())); //Set for row key RowKeyColDesc[] rowKeyColDescs = new RowKeyColDesc[dimensionDescList.size()]; int idx = getTimeRowKeyColDesc(tableName, rowKeyColDescs); - rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryRPCPropertyEnum.PROJECT.toString(), idx + 1); + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkStageEnum.PROJECT.toString(), idx + 1); idx++; - rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryRPCPropertyEnum.REALIZATION.toString(), idx + 1); + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkStageEnum.REALIZATION.toString(), idx + 1); idx++; - rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryRPCPropertyEnum.RPC_SERVER.toString(), idx + 1); + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkStageEnum.CUBOID_ID.toString(), idx + 1); idx++; - rowKeyColDescs[idx] = getRowKeyColDesc(tableName, RecordEvent.RecordReserveKeyEnum.HOST.toString(), idx + 1); + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkStageEnum.QUERY_ID.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkStageEnum.EXECUTION_ID.toString(), idx + 1); idx++; - rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryRPCPropertyEnum.EXCEPTION.toString(), idx + 1); + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkStageEnum.JOB_ID.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkStageEnum.STAGE_ID.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkStageEnum.IF_SUCCESS.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkStageEnum.SUBMIT_TIME.toString(), idx + 1); + idx++; + rowKeyColDescs[idx] = getRowKeyColDesc(tableName, RecordEvent.RecordReserveKeyEnum.HOST.toString(), idx + 1); idx++; RowKeyDesc rowKeyDesc = new RowKeyDesc(); rowKeyDesc.setRowkeyColumns(rowKeyColDescs); //Set for aggregation group - String[][] hierarchy_dims = new String[1][]; + String[][] hierarchy_dims = new String[2][]; hierarchy_dims[0] = getTimeHierarchy(); + hierarchy_dims[1] = new String[4]; + hierarchy_dims[1][0] = QuerySparkStageEnum.QUERY_ID.toString(); + hierarchy_dims[1][1] = QuerySparkStageEnum.EXECUTION_ID.toString(); + hierarchy_dims[1][2] = QuerySparkStageEnum.JOB_ID.toString(); + hierarchy_dims[1][3] = QuerySparkStageEnum.STAGE_ID.toString(); for (int i = 0; i < hierarchy_dims.length; i++) { hierarchy_dims[i] = refineColumnWithTable(tableName, hierarchy_dims[i]); } @@ -447,7 +467,7 @@ public class CubeDescCreator { desc.setDimensions(dimensionDescList); desc.setMeasures(measureDescList); desc.setRowkey(rowKeyDesc); - desc.setHbaseMapping(hBaseMapping); + //desc.setHbaseMapping(hBaseMapping); desc.setNotifyList(Lists.<String> newArrayList()); desc.setStatusNeedNotify(Lists.newArrayList(JobStatusEnum.ERROR.toString())); desc.setAutoMergeTimeRanges(new long[] { 86400000L, 604800000L, 2419200000L }); diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeInstanceCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeInstanceCreator.java index 7d70bc2..e96da4d 100644 --- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeInstanceCreator.java +++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeInstanceCreator.java @@ -34,7 +34,7 @@ public class CubeInstanceCreator { public static void main(String[] args) throws Exception { KylinConfig config = KylinConfig.getInstanceFromEnv(); - CubeInstance cubeInstance = generateKylinCubeInstanceForMetricsQuery("ADMIN", config, new MetricsSinkDesc()); + CubeInstance cubeInstance = generateKylinCubeInstanceForMetricsQueryExecution("ADMIN", config, new MetricsSinkDesc()); ByteArrayOutputStream buf = new ByteArrayOutputStream(); DataOutputStream dout = new DataOutputStream(buf); CubeManager.CUBE_SERIALIZER.serialize(cubeInstance, dout); @@ -43,21 +43,21 @@ public class CubeInstanceCreator { System.out.println(buf.toString("UTF-8")); } - public static CubeInstance generateKylinCubeInstanceForMetricsQuery(String owner, KylinConfig config, + public static CubeInstance generateKylinCubeInstanceForMetricsQueryExecution(String owner, KylinConfig config, MetricsSinkDesc sinkDesc) { - return generateKylinCubeInstance(owner, sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQuery())); + return generateKylinCubeInstance(owner, sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQueryExecution())); } - public static CubeInstance generateKylinCubeInstanceForMetricsQueryCube(String owner, KylinConfig config, + public static CubeInstance generateKylinCubeInstanceForMetricsQuerySparkJob(String owner, KylinConfig config, MetricsSinkDesc sinkDesc) { return generateKylinCubeInstance(owner, - sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQueryCube())); + sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQuerySparkJob())); } - public static CubeInstance generateKylinCubeInstanceForMetricsQueryRPC(String owner, KylinConfig config, + public static CubeInstance generateKylinCubeInstanceForMetricsQuerySparkStage(String owner, KylinConfig config, MetricsSinkDesc sinkDesc) { return generateKylinCubeInstance(owner, - sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQueryRpcCall())); + sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQuerySparkStage())); } public static CubeInstance generateKylinCubeInstanceForMetricsJob(String owner, KylinConfig config, diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java index 35d9efb..73e493e 100644 --- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java +++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java @@ -29,10 +29,9 @@ import org.apache.kylin.metrics.lib.impl.TimePropertyEnum; import org.apache.kylin.metrics.lib.impl.hive.HiveProducerRecord; import org.apache.kylin.metrics.lib.impl.hive.HiveReservoirReporter; import org.apache.kylin.metrics.property.JobPropertyEnum; -import org.apache.kylin.metrics.property.QueryCubePropertyEnum; -import org.apache.kylin.metrics.property.QueryPropertyEnum; -import org.apache.kylin.metrics.property.QueryRPCPropertyEnum; - +import org.apache.kylin.metrics.property.QuerySparkExecutionEnum; +import org.apache.kylin.metrics.property.QuerySparkJobEnum; +import org.apache.kylin.metrics.property.QuerySparkStageEnum; import org.apache.kylin.shaded.com.google.common.base.Strings; import org.apache.kylin.shaded.com.google.common.collect.Lists; @@ -97,18 +96,18 @@ public class HiveTableCreator { } public static String generateHiveTableSQLForMetricsQuery(KylinConfig config) { - String tableName = HiveReservoirReporter.getTableFromSubject(config.getKylinMetricsSubjectQuery()); - return generateHiveTableSQL(tableName, getHiveColumnsForMetricsQuery(), getPartitionKVsForHiveTable()); + String tableName = HiveReservoirReporter.getTableFromSubject(config.getKylinMetricsSubjectQueryExecution()); + return generateHiveTableSQL(tableName, getHiveColumnsForMetricsQueryExecution(), getPartitionKVsForHiveTable()); } public static String generateHiveTableSQLForMetricsQueryCUBE(KylinConfig config) { - String tableName = HiveReservoirReporter.getTableFromSubject(config.getKylinMetricsSubjectQueryCube()); - return generateHiveTableSQL(tableName, getHiveColumnsForMetricsQueryCube(), getPartitionKVsForHiveTable()); + String tableName = HiveReservoirReporter.getTableFromSubject(config.getKylinMetricsSubjectQuerySparkJob()); + return generateHiveTableSQL(tableName, getHiveColumnsForMetricsQuerySparkJob(), getPartitionKVsForHiveTable()); } public static String generateHiveTableSQLForMetricsQueryRPC(KylinConfig config) { - String tableName = HiveReservoirReporter.getTableFromSubject(config.getKylinMetricsSubjectQueryRpcCall()); - return generateHiveTableSQL(tableName, getHiveColumnsForMetricsQueryRPC(), getPartitionKVsForHiveTable()); + String tableName = HiveReservoirReporter.getTableFromSubject(config.getKylinMetricsSubjectQuerySparkStage()); + return generateHiveTableSQL(tableName, getHiveColumnsForMetricsQuerySparkStage(), getPartitionKVsForHiveTable()); } public static String generateHiveTableSQLForMetricsJob(KylinConfig config) { @@ -121,67 +120,94 @@ public class HiveTableCreator { return generateHiveTableSQL(tableName, getHiveColumnsForMetricsJobException(), getPartitionKVsForHiveTable()); } - public static List<Pair<String, String>> getHiveColumnsForMetricsQuery() { + public static List<Pair<String, String>> getHiveColumnsForMetricsQueryExecution() { List<Pair<String, String>> columns = Lists.newLinkedList(); - columns.add(new Pair<>(QueryPropertyEnum.ID_CODE.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkExecutionEnum.ID_CODE.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkExecutionEnum.QUERY_ID.toString(), HiveTypeEnum.HSTRING.toString())); + columns.add(new Pair<>(QuerySparkExecutionEnum.EXECUTION_ID.toString(), HiveTypeEnum.HBIGINT.toString())); columns.add(new Pair<>(RecordEvent.RecordReserveKeyEnum.HOST.toString(), HiveTypeEnum.HSTRING.toString())); - columns.add(new Pair<>(QueryPropertyEnum.USER.toString(), HiveTypeEnum.HSTRING.toString())); - columns.add(new Pair<>(QueryPropertyEnum.PROJECT.toString(), HiveTypeEnum.HSTRING.toString())); - columns.add(new Pair<>(QueryPropertyEnum.REALIZATION.toString(), HiveTypeEnum.HSTRING.toString())); - columns.add(new Pair<>(QueryPropertyEnum.REALIZATION_TYPE.toString(), HiveTypeEnum.HINT.toString())); - columns.add(new Pair<>(QueryPropertyEnum.TYPE.toString(), HiveTypeEnum.HSTRING.toString())); - - columns.add(new Pair<>(QueryPropertyEnum.EXCEPTION.toString(), HiveTypeEnum.HSTRING.toString())); - columns.add(new Pair<>(QueryPropertyEnum.TIME_COST.toString(), HiveTypeEnum.HBIGINT.toString())); - columns.add(new Pair<>(QueryPropertyEnum.CALCITE_RETURN_COUNT.toString(), HiveTypeEnum.HBIGINT.toString())); - columns.add(new Pair<>(QueryPropertyEnum.STORAGE_RETURN_COUNT.toString(), HiveTypeEnum.HBIGINT.toString())); - columns.add(new Pair<>(QueryPropertyEnum.AGGR_FILTER_COUNT.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkExecutionEnum.USER.toString(), HiveTypeEnum.HSTRING.toString())); + columns.add(new Pair<>(QuerySparkExecutionEnum.SPARDER_NAME.toString(), HiveTypeEnum.HSTRING.toString())); + columns.add(new Pair<>(QuerySparkExecutionEnum.PROJECT.toString(), HiveTypeEnum.HSTRING.toString())); + columns.add(new Pair<>(QuerySparkExecutionEnum.REALIZATION.toString(), HiveTypeEnum.HSTRING.toString())); + columns.add(new Pair<>(QuerySparkExecutionEnum.REALIZATION_TYPE.toString(), HiveTypeEnum.HINT.toString())); + columns.add(new Pair<>(QuerySparkExecutionEnum.CUBOID_IDS.toString(), HiveTypeEnum.HSTRING.toString())); + + columns.add(new Pair<>(QuerySparkExecutionEnum.TYPE.toString(), HiveTypeEnum.HSTRING.toString())); + columns.add(new Pair<>(QuerySparkExecutionEnum.START_TIME.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkExecutionEnum.END_TIME.toString(), HiveTypeEnum.HBIGINT.toString())); + + columns.add(new Pair<>(QuerySparkExecutionEnum.EXCEPTION.toString(), HiveTypeEnum.HSTRING.toString())); + columns.add(new Pair<>(QuerySparkExecutionEnum.TIME_COST.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkExecutionEnum.TOTAL_SCAN_COUNT.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkExecutionEnum.TOTAL_SCAN_BYTES.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkExecutionEnum.RESULT_COUNT.toString(), HiveTypeEnum.HBIGINT.toString())); + + columns.add(new Pair<>(QuerySparkExecutionEnum.EXECUTION_DURATION.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkExecutionEnum.RESULT_SIZE.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkExecutionEnum.EXECUTOR_DESERIALIZE_TIME.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkExecutionEnum.EXECUTOR_DESERIALIZE_CPU_TIME.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkExecutionEnum.EXECUTOR_RUN_TIME.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkExecutionEnum.EXECUTOR_CPU_TIME.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkExecutionEnum.JVM_GC_TIME.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkExecutionEnum.RESULT_SERIALIZATION_TIME.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkExecutionEnum.MEMORY_BYTE_SPILLED.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkExecutionEnum.DISK_BYTES_SPILLED.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkExecutionEnum.PEAK_EXECUTION_MEMORY.toString(), HiveTypeEnum.HBIGINT.toString())); columns.addAll(getTimeColumnsForMetrics()); return columns; } - public static List<Pair<String, String>> getHiveColumnsForMetricsQueryCube() { + public static List<Pair<String, String>> getHiveColumnsForMetricsQuerySparkJob() { List<Pair<String, String>> columns = Lists.newLinkedList(); + columns.add(new Pair<>(QuerySparkJobEnum.QUERY_ID.toString(), HiveTypeEnum.HSTRING.toString())); + columns.add(new Pair<>(QuerySparkJobEnum.PROJECT.toString(), HiveTypeEnum.HSTRING.toString())); + columns.add(new Pair<>(QuerySparkJobEnum.EXECUTION_ID.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkJobEnum.JOB_ID.toString(), HiveTypeEnum.HINT.toString())); columns.add(new Pair<>(RecordEvent.RecordReserveKeyEnum.HOST.toString(), HiveTypeEnum.HSTRING.toString())); - columns.add(new Pair<>(QueryCubePropertyEnum.PROJECT.toString(), HiveTypeEnum.HSTRING.toString())); - columns.add(new Pair<>(QueryCubePropertyEnum.CUBE.toString(), HiveTypeEnum.HSTRING.toString())); - columns.add(new Pair<>(QueryCubePropertyEnum.SEGMENT.toString(), HiveTypeEnum.HSTRING.toString())); - columns.add(new Pair<>(QueryCubePropertyEnum.CUBOID_SOURCE.toString(), HiveTypeEnum.HBIGINT.toString())); - columns.add(new Pair<>(QueryCubePropertyEnum.CUBOID_TARGET.toString(), HiveTypeEnum.HBIGINT.toString())); - columns.add(new Pair<>(QueryCubePropertyEnum.IF_MATCH.toString(), HiveTypeEnum.HBOOLEAN.toString())); - columns.add(new Pair<>(QueryCubePropertyEnum.FILTER_MASK.toString(), HiveTypeEnum.HBIGINT.toString())); - columns.add(new Pair<>(QueryCubePropertyEnum.IF_SUCCESS.toString(), HiveTypeEnum.HBOOLEAN.toString())); - - columns.add(new Pair<>(QueryCubePropertyEnum.WEIGHT_PER_HIT.toString(), HiveTypeEnum.HDOUBLE.toString())); - - columns.add(new Pair<>(QueryCubePropertyEnum.CALL_COUNT.toString(), HiveTypeEnum.HBIGINT.toString())); - columns.add(new Pair<>(QueryCubePropertyEnum.TIME_SUM.toString(), HiveTypeEnum.HBIGINT.toString())); - columns.add(new Pair<>(QueryCubePropertyEnum.TIME_MAX.toString(), HiveTypeEnum.HBIGINT.toString())); - columns.add(new Pair<>(QueryCubePropertyEnum.SKIP_COUNT.toString(), HiveTypeEnum.HBIGINT.toString())); - columns.add(new Pair<>(QueryCubePropertyEnum.SCAN_COUNT.toString(), HiveTypeEnum.HBIGINT.toString())); - columns.add(new Pair<>(QueryCubePropertyEnum.RETURN_COUNT.toString(), HiveTypeEnum.HBIGINT.toString())); - columns.add(new Pair<>(QueryCubePropertyEnum.AGGR_FILTER_COUNT.toString(), HiveTypeEnum.HBIGINT.toString())); - columns.add(new Pair<>(QueryCubePropertyEnum.AGGR_COUNT.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkJobEnum.START_TIME.toString(), HiveTypeEnum.HSTRING.toString())); + columns.add(new Pair<>(QuerySparkJobEnum.END_TIME.toString(), HiveTypeEnum.HSTRING.toString())); + columns.add(new Pair<>(QuerySparkJobEnum.IF_SUCCESS.toString(), HiveTypeEnum.HBOOLEAN.toString())); + + columns.add(new Pair<>(QuerySparkJobEnum.RESULT_SIZE.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkJobEnum.EXECUTOR_DESERIALIZE_TIME.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkJobEnum.EXECUTOR_DESERIALIZE_CPU_TIME.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkJobEnum.EXECUTOR_RUN_TIME.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkJobEnum.EXECUTOR_CPU_TIME.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkJobEnum.JVM_GC_TIME.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkJobEnum.RESULT_SERIALIZATION_TIME.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkJobEnum.MEMORY_BYTE_SPILLED.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkJobEnum.DISK_BYTES_SPILLED.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkJobEnum.PEAK_EXECUTION_MEMORY.toString(), HiveTypeEnum.HBIGINT.toString())); columns.addAll(getTimeColumnsForMetrics()); return columns; } - public static List<Pair<String, String>> getHiveColumnsForMetricsQueryRPC() { + public static List<Pair<String, String>> getHiveColumnsForMetricsQuerySparkStage() { List<Pair<String, String>> columns = Lists.newLinkedList(); columns.add(new Pair<>(RecordEvent.RecordReserveKeyEnum.HOST.toString(), HiveTypeEnum.HSTRING.toString())); - columns.add(new Pair<>(QueryRPCPropertyEnum.PROJECT.toString(), HiveTypeEnum.HSTRING.toString())); - columns.add(new Pair<>(QueryRPCPropertyEnum.REALIZATION.toString(), HiveTypeEnum.HSTRING.toString())); - columns.add(new Pair<>(QueryRPCPropertyEnum.RPC_SERVER.toString(), HiveTypeEnum.HSTRING.toString())); - columns.add(new Pair<>(QueryRPCPropertyEnum.EXCEPTION.toString(), HiveTypeEnum.HSTRING.toString())); - - columns.add(new Pair<>(QueryRPCPropertyEnum.CALL_TIME.toString(), HiveTypeEnum.HBIGINT.toString())); - columns.add(new Pair<>(QueryRPCPropertyEnum.RETURN_COUNT.toString(), HiveTypeEnum.HBIGINT.toString())); - columns.add(new Pair<>(QueryRPCPropertyEnum.SCAN_COUNT.toString(), HiveTypeEnum.HBIGINT.toString())); - columns.add(new Pair<>(QueryRPCPropertyEnum.SKIP_COUNT.toString(), HiveTypeEnum.HBIGINT.toString())); - columns.add(new Pair<>(QueryRPCPropertyEnum.AGGR_FILTER_COUNT.toString(), HiveTypeEnum.HBIGINT.toString())); - columns.add(new Pair<>(QueryRPCPropertyEnum.AGGR_COUNT.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkStageEnum.QUERY_ID.toString(), HiveTypeEnum.HSTRING.toString())); + columns.add(new Pair<>(QuerySparkStageEnum.EXECUTION_ID.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkStageEnum.JOB_ID.toString(), HiveTypeEnum.HINT.toString())); + columns.add(new Pair<>(QuerySparkStageEnum.STAGE_ID.toString(), HiveTypeEnum.HINT.toString())); + columns.add(new Pair<>(QuerySparkStageEnum.SUBMIT_TIME.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkStageEnum.PROJECT.toString(), HiveTypeEnum.HSTRING.toString())); + columns.add(new Pair<>(QuerySparkStageEnum.REALIZATION.toString(), HiveTypeEnum.HSTRING.toString())); + columns.add(new Pair<>(QuerySparkStageEnum.CUBOID_ID.toString(), HiveTypeEnum.HSTRING.toString())); + columns.add(new Pair<>(QuerySparkStageEnum.IF_SUCCESS.toString(), HiveTypeEnum.HBOOLEAN.toString())); + + columns.add(new Pair<>(QuerySparkStageEnum.RESULT_SIZE.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkStageEnum.EXECUTOR_DESERIALIZE_TIME.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkStageEnum.EXECUTOR_DESERIALIZE_CPU_TIME.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkStageEnum.EXECUTOR_RUN_TIME.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkStageEnum.EXECUTOR_CPU_TIME.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkStageEnum.JVM_GC_TIME.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkStageEnum.RESULT_SERIALIZATION_TIME.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkStageEnum.MEMORY_BYTE_SPILLED.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkStageEnum.DISK_BYTES_SPILLED.toString(), HiveTypeEnum.HBIGINT.toString())); + columns.add(new Pair<>(QuerySparkStageEnum.PEAK_EXECUTION_MEMORY.toString(), HiveTypeEnum.HBIGINT.toString())); columns.addAll(getTimeColumnsForMetrics()); return columns; diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java index 9636811..6d70a7e 100644 --- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java +++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java @@ -39,7 +39,7 @@ public class KylinTableCreator { public static void main(String[] args) throws Exception { KylinConfig config = KylinConfig.getInstanceFromEnv(); - TableDesc kylinTable = generateKylinTableForMetricsQuery(config, new MetricsSinkDesc()); + TableDesc kylinTable = generateKylinTableForMetricsQueryExecution(config, new MetricsSinkDesc()); ByteArrayOutputStream buf = new ByteArrayOutputStream(); DataOutputStream dout = new DataOutputStream(buf); TableMetadataManager.TABLE_SERIALIZER.serialize(kylinTable, dout); @@ -48,25 +48,25 @@ public class KylinTableCreator { System.out.println(buf.toString("UTF-8")); } - public static TableDesc generateKylinTableForMetricsQuery(KylinConfig kylinConfig, MetricsSinkDesc sinkDesc) { + public static TableDesc generateKylinTableForMetricsQueryExecution(KylinConfig kylinConfig, MetricsSinkDesc sinkDesc) { List<Pair<String, String>> columns = Lists.newLinkedList(); - columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQuery()); + columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQueryExecution()); columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable()); - return generateKylinTable(kylinConfig, sinkDesc, kylinConfig.getKylinMetricsSubjectQuery(), columns); + return generateKylinTable(kylinConfig, sinkDesc, kylinConfig.getKylinMetricsSubjectQueryExecution(), columns); } - public static TableDesc generateKylinTableForMetricsQueryCube(KylinConfig kylinConfig, MetricsSinkDesc sinkDesc) { + public static TableDesc generateKylinTableForMetricsQuerySparkJob(KylinConfig kylinConfig, MetricsSinkDesc sinkDesc) { List<Pair<String, String>> columns = Lists.newLinkedList(); - columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQueryCube()); + columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQuerySparkJob()); columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable()); - return generateKylinTable(kylinConfig, sinkDesc, kylinConfig.getKylinMetricsSubjectQueryCube(), columns); + return generateKylinTable(kylinConfig, sinkDesc, kylinConfig.getKylinMetricsSubjectQuerySparkJob(), columns); } - public static TableDesc generateKylinTableForMetricsQueryRPC(KylinConfig kylinConfig, MetricsSinkDesc sinkDesc) { + public static TableDesc generateKylinTableForMetricsQuerySparkStage(KylinConfig kylinConfig, MetricsSinkDesc sinkDesc) { List<Pair<String, String>> columns = Lists.newLinkedList(); - columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQueryRPC()); + columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQuerySparkStage()); columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable()); - return generateKylinTable(kylinConfig, sinkDesc, kylinConfig.getKylinMetricsSubjectQueryRpcCall(), columns); + return generateKylinTable(kylinConfig, sinkDesc, kylinConfig.getKylinMetricsSubjectQuerySparkStage(), columns); } public static TableDesc generateKylinTableForMetricsJob(KylinConfig kylinConfig, MetricsSinkDesc sinkDesc) { diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/ModelCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/ModelCreator.java index 429509a..8a36549 100644 --- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/ModelCreator.java +++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/ModelCreator.java @@ -32,9 +32,9 @@ import org.apache.kylin.metadata.model.PartitionDesc; import org.apache.kylin.metrics.lib.impl.RecordEvent; import org.apache.kylin.metrics.lib.impl.TimePropertyEnum; import org.apache.kylin.metrics.property.JobPropertyEnum; -import org.apache.kylin.metrics.property.QueryCubePropertyEnum; -import org.apache.kylin.metrics.property.QueryPropertyEnum; -import org.apache.kylin.metrics.property.QueryRPCPropertyEnum; +import org.apache.kylin.metrics.property.QuerySparkExecutionEnum; +import org.apache.kylin.metrics.property.QuerySparkJobEnum; +import org.apache.kylin.metrics.property.QuerySparkStageEnum; import org.apache.kylin.tool.metrics.systemcube.def.MetricsSinkDesc; import org.apache.kylin.shaded.com.google.common.collect.Lists; @@ -65,23 +65,23 @@ public class ModelCreator { public static DataModelDesc generateKylinModelForMetricsQuery(String owner, KylinConfig kylinConfig, MetricsSinkDesc sinkDesc) { - String tableName = sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQuery()); - return generateKylinModel(owner, tableName, getDimensionsForMetricsQuery(), getMeasuresForMetricsQuery(), - getPartitionDesc(tableName)); + String tableName = sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQueryExecution()); + return generateKylinModel(owner, tableName, getDimensionsForMetricsQueryExecution(), + getMeasuresForMetricsQueryExecution(), getPartitionDesc(tableName)); } public static DataModelDesc generateKylinModelForMetricsQueryCube(String owner, KylinConfig kylinConfig, MetricsSinkDesc sinkDesc) { - String tableName = sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQueryCube()); - return generateKylinModel(owner, tableName, getDimensionsForMetricsQueryCube(), - getMeasuresForMetricsQueryCube(), getPartitionDesc(tableName)); + String tableName = sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQuerySparkJob()); + return generateKylinModel(owner, tableName, getDimensionsForMetricsQuerySparkJob(), + getMeasuresForMetricsQuerySparkJob(), getPartitionDesc(tableName)); } public static DataModelDesc generateKylinModelForMetricsQueryRPC(String owner, KylinConfig kylinConfig, MetricsSinkDesc sinkDesc) { - String tableName = sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQueryRpcCall()); - return generateKylinModel(owner, tableName, getDimensionsForMetricsQueryRPC(), getMeasuresForMetricsQueryRPC(), - getPartitionDesc(tableName)); + String tableName = sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQuerySparkStage()); + return generateKylinModel(owner, tableName, getDimensionsForMetricsQuerySparkStage(), + getMeasuresForMetricsQuerySparkStage(), getPartitionDesc(tableName)); } public static DataModelDesc generateKylinModelForMetricsJob(String owner, KylinConfig kylinConfig, @@ -98,82 +98,106 @@ public class ModelCreator { getMeasuresForMetricsJobException(), getPartitionDesc(tableName)); } - public static List<String> getDimensionsForMetricsQuery() { + public static List<String> getDimensionsForMetricsQueryExecution() { List<String> result = Lists.newLinkedList(); result.add(RecordEvent.RecordReserveKeyEnum.HOST.toString()); - result.add(QueryPropertyEnum.USER.toString()); - result.add(QueryPropertyEnum.PROJECT.toString()); - result.add(QueryPropertyEnum.REALIZATION.toString()); - result.add(QueryPropertyEnum.REALIZATION_TYPE.toString()); - result.add(QueryPropertyEnum.TYPE.toString()); - result.add(QueryPropertyEnum.EXCEPTION.toString()); + result.add(QuerySparkExecutionEnum.USER.toString()); + result.add(QuerySparkExecutionEnum.PROJECT.toString()); + result.add(QuerySparkExecutionEnum.REALIZATION.toString()); + result.add(QuerySparkExecutionEnum.REALIZATION_TYPE.toString()); + result.add(QuerySparkExecutionEnum.CUBOID_IDS.toString()); + result.add(QuerySparkExecutionEnum.TYPE.toString()); + result.add(QuerySparkExecutionEnum.EXCEPTION.toString()); + result.add(QuerySparkExecutionEnum.SPARDER_NAME.toString()); + result.add(QuerySparkExecutionEnum.QUERY_ID.toString()); + result.add(QuerySparkExecutionEnum.START_TIME.toString()); + result.add(QuerySparkExecutionEnum.END_TIME.toString()); result.addAll(getTimeDimensionsForMetrics()); return result; } - public static List<String> getMeasuresForMetricsQuery() { + public static List<String> getMeasuresForMetricsQueryExecution() { List<String> result = Lists.newLinkedList(); - result.add(QueryPropertyEnum.ID_CODE.toString()); - result.add(QueryPropertyEnum.TIME_COST.toString()); - result.add(QueryPropertyEnum.CALCITE_RETURN_COUNT.toString()); - result.add(QueryPropertyEnum.STORAGE_RETURN_COUNT.toString()); - result.add(QueryPropertyEnum.AGGR_FILTER_COUNT.toString()); - + result.add(QuerySparkExecutionEnum.ID_CODE.toString()); + result.add(QuerySparkExecutionEnum.TIME_COST.toString()); + result.add(QuerySparkExecutionEnum.TOTAL_SCAN_COUNT.toString()); + result.add(QuerySparkExecutionEnum.TOTAL_SCAN_BYTES.toString()); + result.add(QuerySparkExecutionEnum.RESULT_COUNT.toString()); + result.add(QuerySparkExecutionEnum.EXECUTION_DURATION.toString()); + result.add(QuerySparkExecutionEnum.RESULT_SIZE.toString()); + result.add(QuerySparkExecutionEnum.EXECUTOR_DESERIALIZE_TIME.toString()); + result.add(QuerySparkExecutionEnum.EXECUTOR_DESERIALIZE_CPU_TIME.toString()); + result.add(QuerySparkExecutionEnum.EXECUTOR_RUN_TIME.toString()); + result.add(QuerySparkExecutionEnum.EXECUTOR_CPU_TIME.toString()); + result.add(QuerySparkExecutionEnum.JVM_GC_TIME.toString()); + result.add(QuerySparkExecutionEnum.RESULT_SERIALIZATION_TIME.toString()); + result.add(QuerySparkExecutionEnum.MEMORY_BYTE_SPILLED.toString()); + result.add(QuerySparkExecutionEnum.DISK_BYTES_SPILLED.toString()); + result.add(QuerySparkExecutionEnum.PEAK_EXECUTION_MEMORY.toString()); return result; } - public static List<String> getDimensionsForMetricsQueryCube() { + public static List<String> getDimensionsForMetricsQuerySparkJob() { List<String> result = Lists.newLinkedList(); result.add(RecordEvent.RecordReserveKeyEnum.HOST.toString()); - result.add(QueryCubePropertyEnum.PROJECT.toString()); - result.add(QueryCubePropertyEnum.CUBE.toString()); - result.add(QueryCubePropertyEnum.SEGMENT.toString()); - result.add(QueryCubePropertyEnum.CUBOID_SOURCE.toString()); - result.add(QueryCubePropertyEnum.CUBOID_TARGET.toString()); - result.add(QueryCubePropertyEnum.FILTER_MASK.toString()); - result.add(QueryCubePropertyEnum.IF_MATCH.toString()); - result.add(QueryCubePropertyEnum.IF_SUCCESS.toString()); + result.add(QuerySparkJobEnum.QUERY_ID.toString()); + result.add(QuerySparkJobEnum.EXECUTION_ID.toString()); + result.add(QuerySparkJobEnum.JOB_ID.toString()); + result.add(QuerySparkJobEnum.PROJECT.toString()); + result.add(QuerySparkJobEnum.START_TIME.toString()); + result.add(QuerySparkJobEnum.END_TIME.toString()); + result.add(QuerySparkJobEnum.IF_SUCCESS.toString()); result.addAll(getTimeDimensionsForMetrics()); return result; } - public static List<String> getMeasuresForMetricsQueryCube() { + public static List<String> getMeasuresForMetricsQuerySparkJob() { List<String> result = Lists.newLinkedList(); - result.add(QueryCubePropertyEnum.WEIGHT_PER_HIT.toString()); - result.add(QueryCubePropertyEnum.CALL_COUNT.toString()); - result.add(QueryCubePropertyEnum.TIME_SUM.toString()); - result.add(QueryCubePropertyEnum.TIME_MAX.toString()); - result.add(QueryCubePropertyEnum.SKIP_COUNT.toString()); - result.add(QueryCubePropertyEnum.SCAN_COUNT.toString()); - result.add(QueryCubePropertyEnum.RETURN_COUNT.toString()); - result.add(QueryCubePropertyEnum.AGGR_FILTER_COUNT.toString()); - result.add(QueryCubePropertyEnum.AGGR_COUNT.toString()); + result.add(QuerySparkJobEnum.RESULT_SIZE.toString()); + result.add(QuerySparkJobEnum.EXECUTOR_DESERIALIZE_TIME.toString()); + result.add(QuerySparkJobEnum.EXECUTOR_DESERIALIZE_CPU_TIME.toString()); + result.add(QuerySparkJobEnum.EXECUTOR_RUN_TIME.toString()); + result.add(QuerySparkJobEnum.EXECUTOR_CPU_TIME.toString()); + result.add(QuerySparkJobEnum.JVM_GC_TIME.toString()); + result.add(QuerySparkJobEnum.RESULT_SERIALIZATION_TIME.toString()); + result.add(QuerySparkJobEnum.MEMORY_BYTE_SPILLED.toString()); + result.add(QuerySparkJobEnum.DISK_BYTES_SPILLED.toString()); + result.add(QuerySparkJobEnum.PEAK_EXECUTION_MEMORY.toString()); return result; } - public static List<String> getDimensionsForMetricsQueryRPC() { + public static List<String> getDimensionsForMetricsQuerySparkStage() { List<String> result = Lists.newLinkedList(); result.add(RecordEvent.RecordReserveKeyEnum.HOST.toString()); - result.add(QueryRPCPropertyEnum.PROJECT.toString()); - result.add(QueryRPCPropertyEnum.REALIZATION.toString()); - result.add(QueryRPCPropertyEnum.RPC_SERVER.toString()); - result.add(QueryRPCPropertyEnum.EXCEPTION.toString()); + result.add(QuerySparkStageEnum.QUERY_ID.toString()); + result.add(QuerySparkStageEnum.EXECUTION_ID.toString()); + result.add(QuerySparkStageEnum.JOB_ID.toString()); + result.add(QuerySparkStageEnum.STAGE_ID.toString()); + result.add(QuerySparkStageEnum.SUBMIT_TIME.toString()); + result.add(QuerySparkStageEnum.PROJECT.toString()); + result.add(QuerySparkStageEnum.REALIZATION.toString()); + result.add(QuerySparkStageEnum.CUBOID_ID.toString()); + result.add(QuerySparkStageEnum.IF_SUCCESS.toString()); result.addAll(getTimeDimensionsForMetrics()); return result; } - public static List<String> getMeasuresForMetricsQueryRPC() { + public static List<String> getMeasuresForMetricsQuerySparkStage() { List<String> result = Lists.newLinkedList(); - result.add(QueryRPCPropertyEnum.CALL_TIME.toString()); - result.add(QueryRPCPropertyEnum.RETURN_COUNT.toString()); - result.add(QueryRPCPropertyEnum.SCAN_COUNT.toString()); - result.add(QueryRPCPropertyEnum.SKIP_COUNT.toString()); - result.add(QueryRPCPropertyEnum.AGGR_FILTER_COUNT.toString()); - result.add(QueryRPCPropertyEnum.AGGR_COUNT.toString()); + result.add(QuerySparkStageEnum.RESULT_SIZE.toString()); + result.add(QuerySparkStageEnum.EXECUTOR_DESERIALIZE_TIME.toString()); + result.add(QuerySparkStageEnum.EXECUTOR_DESERIALIZE_CPU_TIME.toString()); + result.add(QuerySparkStageEnum.EXECUTOR_RUN_TIME.toString()); + result.add(QuerySparkStageEnum.EXECUTOR_CPU_TIME.toString()); + result.add(QuerySparkStageEnum.JVM_GC_TIME.toString()); + result.add(QuerySparkStageEnum.RESULT_SERIALIZATION_TIME.toString()); + result.add(QuerySparkStageEnum.MEMORY_BYTE_SPILLED.toString()); + result.add(QuerySparkStageEnum.DISK_BYTES_SPILLED.toString()); + result.add(QuerySparkStageEnum.PEAK_EXECUTION_MEMORY.toString()); return result; } diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/SCCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/SCCreator.java index 17fde95..3322677 100644 --- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/SCCreator.java +++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/SCCreator.java @@ -186,9 +186,9 @@ public class SCCreator extends AbstractApplication { private List<TableDesc> generateKylinTableForSystemCube(MetricsSinkDesc sinkDesc) { List<TableDesc> result = Lists.newLinkedList(); - result.add(KylinTableCreator.generateKylinTableForMetricsQuery(config, sinkDesc)); - result.add(KylinTableCreator.generateKylinTableForMetricsQueryCube(config, sinkDesc)); - result.add(KylinTableCreator.generateKylinTableForMetricsQueryRPC(config, sinkDesc)); + result.add(KylinTableCreator.generateKylinTableForMetricsQueryExecution(config, sinkDesc)); + result.add(KylinTableCreator.generateKylinTableForMetricsQuerySparkJob(config, sinkDesc)); + result.add(KylinTableCreator.generateKylinTableForMetricsQuerySparkStage(config, sinkDesc)); result.add(KylinTableCreator.generateKylinTableForMetricsJob(config, sinkDesc)); result.add(KylinTableCreator.generateKylinTableForMetricsJobException(config, sinkDesc)); @@ -208,9 +208,9 @@ public class SCCreator extends AbstractApplication { private List<CubeDesc> generateKylinCubeDescForSystemCube(MetricsSinkDesc sinkDesc) { List<CubeDesc> result = Lists.newLinkedList(); - result.add(CubeDescCreator.generateKylinCubeDescForMetricsQuery(config, sinkDesc)); - result.add(CubeDescCreator.generateKylinCubeDescForMetricsQueryCube(config, sinkDesc)); - result.add(CubeDescCreator.generateKylinCubeDescForMetricsQueryRPC(config, sinkDesc)); + result.add(CubeDescCreator.generateKylinCubeDescForMetricsQueryExecution(config, sinkDesc)); + result.add(CubeDescCreator.generateKylinCubeDescForMetricsQuerySparkJob(config, sinkDesc)); + result.add(CubeDescCreator.generateKylinCubeDescForMetricsQuerySparkStage(config, sinkDesc)); result.add(CubeDescCreator.generateKylinCubeDescForMetricsJob(config, sinkDesc)); result.add(CubeDescCreator.generateKylinCubeDescForMetricsJobException(config, sinkDesc)); @@ -219,9 +219,9 @@ public class SCCreator extends AbstractApplication { private List<CubeInstance> generateKylinCubeInstanceForSystemCube(String owner, MetricsSinkDesc sinkDesc) { List<CubeInstance> result = Lists.newLinkedList(); - result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQuery(owner, config, sinkDesc)); - result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQueryCube(owner, config, sinkDesc)); - result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQueryRPC(owner, config, sinkDesc)); + result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQueryExecution(owner, config, sinkDesc)); + result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQuerySparkJob(owner, config, sinkDesc)); + result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQuerySparkStage(owner, config, sinkDesc)); result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsJob(owner, config, sinkDesc)); result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsJobException(owner, config, sinkDesc)); diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/streamingv2/KafkaTopicCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/streamingv2/KafkaTopicCreator.java index c495919..50ab091 100644 --- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/streamingv2/KafkaTopicCreator.java +++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/streamingv2/KafkaTopicCreator.java @@ -25,9 +25,9 @@ public class KafkaTopicCreator { public static String generateCreateCommand(KylinConfig config) { StringBuilder sb = new StringBuilder(); String[] topics = new String[]{ - config.getKylinMetricsSubjectQuery(), - config.getKylinMetricsSubjectQueryCube(), - config.getKylinMetricsSubjectQueryRpcCall(), + config.getKylinMetricsSubjectQueryExecution(), + config.getKylinMetricsSubjectQuerySparkJob(), + config.getKylinMetricsSubjectQuerySparkStage(), config.getKylinMetricsSubjectJob(), config.getKylinMetricsSubjectJobException()}; for (String topic : topics) {