This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit af3a1b55ae1bbfb157ce9ec63a73651865c4c94a
Author: yaqian.zhang <598593...@qq.com>
AuthorDate: Tue Jan 5 18:49:19 2021 +0800

    KYLIN-4857 Refactor system cube for kylin4
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  20 +-
 .../apache/kylin/metrics/QuerySparkMetrics.java    | 741 +++++++++++++++++++++
 .../metrics/property/QuerySparkExecutionEnum.java  |  80 +++
 .../kylin/metrics/property/QuerySparkJobEnum.java  |  66 ++
 .../metrics/property/QuerySparkStageEnum.java      |  71 ++
 .../org/apache/spark/sql/SparderContext.scala      |   6 +
 .../spark/sql/metrics/SparderMetricsListener.scala | 144 ++++
 .../kylin/rest/metrics/QueryMetricsFacade.java     | 164 +----
 .../org/apache/kylin/rest/service/CubeService.java |   6 +-
 .../kylin/rest/service/DashboardService.java       |  21 +-
 .../apache/kylin/rest/service/QueryService.java    |  16 +-
 .../kylin/rest/metrics/QueryMetricsTest.java       |   6 +-
 .../tool/metrics/systemcube/CubeDescCreator.java   | 156 +++--
 .../metrics/systemcube/CubeInstanceCreator.java    |  14 +-
 .../tool/metrics/systemcube/HiveTableCreator.java  | 136 ++--
 .../tool/metrics/systemcube/KylinTableCreator.java |  20 +-
 .../tool/metrics/systemcube/ModelCreator.java      | 138 ++--
 .../kylin/tool/metrics/systemcube/SCCreator.java   |  18 +-
 .../systemcube/streamingv2/KafkaTopicCreator.java  |   6 +-
 19 files changed, 1450 insertions(+), 379 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index dc91a7f..36950ec 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2349,20 +2349,28 @@ public abstract class KylinConfigBase implements 
Serializable {
                 + getKylinMetricsSubjectSuffix();
     }
 
-    public String getKylinMetricsSubjectQuery() {
-        return getOptional("kylin.metrics.subject-query", "METRICS_QUERY") + 
"_" + getKylinMetricsSubjectSuffix();
+    public String getKylinMetricsSubjectQueryExecution() {
+        return getOptional("kylin.metrics.subject-query", 
"METRICS_QUERY_EXECUTION") + "_" + getKylinMetricsSubjectSuffix();
     }
 
-    public String getKylinMetricsSubjectQueryCube() {
-        return getOptional("kylin.metrics.subject-query-cube", 
"METRICS_QUERY_CUBE") + "_"
+    public String getKylinMetricsSubjectQuerySparkJob() {
+        return getOptional("kylin.metrics.subject-query-cube", 
"METRICS_QUERY_SPARK_JOB") + "_"
                 + getKylinMetricsSubjectSuffix();
     }
 
-    public String getKylinMetricsSubjectQueryRpcCall() {
-        return getOptional("kylin.metrics.subject-query-rpc", 
"METRICS_QUERY_RPC") + "_"
+    public String getKylinMetricsSubjectQuerySparkStage() {
+        return getOptional("kylin.metrics.subject-query-rpc", 
"METRICS_QUERY_SPARK_STAGE") + "_"
                 + getKylinMetricsSubjectSuffix();
     }
 
+    public int getKylinMetricsCacheExpireSeconds() {
+        return 
Integer.parseInt(this.getOptional("kylin.metrics.query-cache.expire-seconds", 
"600"));
+    }
+
+    public int getKylinMetricsCacheMaxEntries() {
+        return 
Integer.parseInt(this.getOptional("kylin.metrics.query-cache.max-entries", 
"10000"));
+    }
+
     public Map<String, String> getKylinMetricsConf() {
         return getPropertiesByPrefix("kylin.metrics.");
     }
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java 
b/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java
new file mode 100644
index 0000000..ed2430c
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java
@@ -0,0 +1,741 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metrics;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metrics.lib.impl.RecordEvent;
+import org.apache.kylin.metrics.lib.impl.TimedRecordEvent;
+import org.apache.kylin.metrics.property.QuerySparkExecutionEnum;
+import org.apache.kylin.metrics.property.QuerySparkJobEnum;
+import org.apache.kylin.metrics.property.QuerySparkStageEnum;
+import org.apache.kylin.shaded.com.google.common.cache.Cache;
+import org.apache.kylin.shaded.com.google.common.cache.CacheBuilder;
+import org.apache.kylin.shaded.com.google.common.cache.RemovalListener;
+import org.apache.kylin.shaded.com.google.common.cache.RemovalNotification;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public class QuerySparkMetrics {
+    private static final Logger logger = 
LoggerFactory.getLogger(QuerySparkMetrics.class);
+    private static final QuerySparkMetrics instance = new QuerySparkMetrics();
+    private static final int sparkMetricsNum = 10;
+    private org.apache.kylin.shaded.com.google.common.cache.Cache<String, 
QueryExecutionMetrics> queryExecutionMetricsMap;
+
+    private QuerySparkMetrics() {
+        queryExecutionMetricsMap = CacheBuilder.newBuilder()
+                
.maximumSize(KylinConfig.getInstanceFromEnv().getKylinMetricsCacheMaxEntries())
+                
.expireAfterWrite(KylinConfig.getInstanceFromEnv().getKylinMetricsCacheExpireSeconds(),
+                        TimeUnit.SECONDS)
+                .removalListener(new RemovalListener<String, 
QueryExecutionMetrics>() {
+                    @Override
+                    public void onRemoval(RemovalNotification<String, 
QueryExecutionMetrics> notification) {
+                        try {
+                            updateMetricsToReservoir(notification.getKey(), 
notification.getValue());
+                            logger.info("Query metrics {} is removed due to 
{}, update to metrics reservoir successful",
+                                    notification.getKey(), 
notification.getCause());
+                        } catch(Exception e) {
+                            logger.warn("Query metrics {} is removed due to 
{}, update to metrics reservoir failed",
+                                    notification.getKey(), 
notification.getCause());
+                        }
+                    }
+                }).build();
+
+        
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new 
Runnable() {
+                                                                               
 @Override
+                                                                               
 public void run() {
+                                                                               
     queryExecutionMetricsMap.cleanUp();
+                                                                               
 }
+                                                                            },
+                
KylinConfig.getInstanceFromEnv().getKylinMetricsCacheExpireSeconds(),
+                
KylinConfig.getInstanceFromEnv().getKylinMetricsCacheExpireSeconds(), 
TimeUnit.SECONDS);
+
+    }
+
+    public static QuerySparkMetrics getInstance() {
+        return instance;
+    }
+
+    public void onJobStart(String queryId, String sparderName, long 
executionId, long executionStartTime, int jobId,
+            long jobStartTime) {
+        QueryExecutionMetrics queryExecutionMetrics = 
queryExecutionMetricsMap.getIfPresent(queryId);
+        if (queryExecutionMetrics == null) {
+            queryExecutionMetrics = new QueryExecutionMetrics();
+            ConcurrentMap<Integer, SparkJobMetrics> sparkJobMetricsMap = 
Maps.newConcurrentMap();
+            queryExecutionMetrics.setQueryId(queryId);
+            queryExecutionMetrics.setSparderName(sparderName);
+            queryExecutionMetrics.setExecutionId(executionId);
+            queryExecutionMetrics.setStartTime(executionStartTime);
+            queryExecutionMetrics.setSparkJobMetricsMap(sparkJobMetricsMap);
+            queryExecutionMetricsMap.put(queryId, queryExecutionMetrics);
+        }
+        SparkJobMetrics sparkJobMetrics = new SparkJobMetrics();
+        sparkJobMetrics.setExecutionId(executionId);
+        sparkJobMetrics.setJobId(jobId);
+        sparkJobMetrics.setStartTime(jobStartTime);
+
+        ConcurrentMap<Integer, SparkStageMetrics> sparkStageMetricsMap = 
Maps.newConcurrentMap();
+        sparkJobMetrics.setSparkStageMetricsMap(sparkStageMetricsMap);
+
+        queryExecutionMetrics.getSparkJobMetricsMap().put(jobId, 
sparkJobMetrics);
+    }
+
+    public void onSparkStageStart(String queryId, int jobId, int stageId, 
String stageType, long submitTime) {
+        QueryExecutionMetrics queryExecutionMetrics = 
queryExecutionMetricsMap.getIfPresent(queryId);
+        if (queryExecutionMetrics != null && 
queryExecutionMetrics.getSparkJobMetricsMap().get(jobId) != null) {
+            SparkStageMetrics sparkStageMetrics = new SparkStageMetrics();
+            sparkStageMetrics.setStageId(stageId);
+            sparkStageMetrics.setStageType(stageType);
+            sparkStageMetrics.setSubmitTime(submitTime);
+            
queryExecutionMetrics.getSparkJobMetricsMap().get(jobId).getSparkStageMetricsMap().put(stageId,
+                    sparkStageMetrics);
+        }
+    }
+
+    public void updateSparkStageMetrics(String queryId, int jobId, int 
stageId, boolean isSuccess,
+            SparkStageMetrics sparkStageMetricsEnd) {
+        QueryExecutionMetrics queryExecutionMetrics = 
queryExecutionMetricsMap.getIfPresent(queryId);
+        if (queryExecutionMetrics != null) {
+            SparkJobMetrics sparkJobMetrics = 
queryExecutionMetrics.getSparkJobMetricsMap().get(jobId);
+            if (sparkJobMetrics != null) {
+                SparkStageMetrics sparkStageMetrics = 
sparkJobMetrics.getSparkStageMetricsMap().get(stageId);
+                if (sparkStageMetrics != null) {
+                    sparkStageMetrics.setSuccess(isSuccess);
+                    
sparkStageMetrics.setMetrics(sparkStageMetricsEnd.getResultSize(),
+                            sparkStageMetricsEnd.getExecutorDeserializeTime(),
+                            
sparkStageMetricsEnd.getExecutorDeserializeCpuTime(),
+                            sparkStageMetricsEnd.getExecutorRunTime(), 
sparkStageMetricsEnd.getExecutorCpuTime(),
+                            sparkStageMetricsEnd.getJvmGCTime(), 
sparkStageMetricsEnd.getResultSerializationTime(),
+                            sparkStageMetricsEnd.getMemoryBytesSpilled(), 
sparkStageMetricsEnd.getDiskBytesSpilled(),
+                            sparkStageMetricsEnd.getPeakExecutionMemory());
+                }
+            }
+        }
+    }
+
+    public void updateSparkJobMetrics(String queryId, int jobId, long 
jobEndTime, boolean isSuccess) {
+        QueryExecutionMetrics queryExecutionMetrics = 
queryExecutionMetricsMap.getIfPresent(queryId);
+        if (queryExecutionMetrics != null && 
queryExecutionMetrics.getSparkJobMetricsMap().get(jobId) != null) {
+            SparkJobMetrics sparkJobMetrics = 
queryExecutionMetrics.getSparkJobMetricsMap().get(jobId);
+            sparkJobMetrics.setEndTime(jobEndTime);
+            sparkJobMetrics.setSuccess(isSuccess);
+        }
+    }
+
+    public void updateExecutionMetrics(String queryId, long executionEndTime) {
+        QueryExecutionMetrics queryExecutionMetrics = 
queryExecutionMetricsMap.getIfPresent(queryId);
+        if (queryExecutionMetrics != null) {
+            queryExecutionMetrics.setEndTime(executionEndTime);
+        }
+    }
+
+    public Cache<String, QueryExecutionMetrics> getQueryExecutionMetricsMap() {
+        return queryExecutionMetricsMap;
+    }
+
+    public QueryExecutionMetrics getQueryExecutionMetrics(String queryId) {
+        return queryExecutionMetricsMap.getIfPresent(queryId);
+    }
+
+    public void setQueryRealization(String queryId, String realizationName, 
int realizationType, String cuboidIds) {
+        QueryExecutionMetrics queryExecutionMetrics = 
queryExecutionMetricsMap.getIfPresent(queryId);
+        if (queryExecutionMetrics != null) {
+            queryExecutionMetrics.setRealization(realizationName);
+            queryExecutionMetrics.setRealizationType(realizationType);
+            queryExecutionMetrics.setCuboidIds(cuboidIds);
+        }
+    }
+
+    /**
+     * report query related metrics
+     */
+    public void updateMetricsToReservoir(String queryId, QueryExecutionMetrics 
queryExecutionMetrics) {
+        if 
(!KylinConfig.getInstanceFromEnv().isKylinMetricsReporterForQueryEnabled()) {
+            return;
+        }
+        if (queryExecutionMetrics != null) {
+            RecordEvent queryExecutionMetricsEvent = new TimedRecordEvent(
+                    
KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryExecution());
+
+            setQueryWrapper(queryExecutionMetricsEvent, 
queryExecutionMetrics.getUser(),
+                    queryExecutionMetrics.getSqlIdCode(), 
queryExecutionMetrics.getQueryType(),
+                    queryId, queryExecutionMetrics.getProject(),
+                    queryExecutionMetrics.getException());
+
+            setSparkExecutionWrapper(queryExecutionMetricsEvent, 
queryExecutionMetrics.getSparderName(),
+                    queryExecutionMetrics.getExecutionId(), 
queryExecutionMetrics.getRealization(),
+                    queryExecutionMetrics.getRealizationType(), 
queryExecutionMetrics.getCuboidIds(),
+                    queryExecutionMetrics.getStartTime(), 
queryExecutionMetrics.getEndTime());
+
+            setQueryMetrics(queryExecutionMetricsEvent, 
queryExecutionMetrics.getSqlDuration(),
+                    queryExecutionMetrics.getTotalScanCount(), 
queryExecutionMetrics.getTotalScanBytes(),
+                    queryExecutionMetrics.getResultCount());
+
+            long[] queryExecutionMetricsList = new long[sparkMetricsNum];
+            for (Map.Entry<Integer, QuerySparkMetrics.SparkJobMetrics> 
sparkJobMetricsEntry : queryExecutionMetrics
+                    .getSparkJobMetricsMap().entrySet()) {
+                RecordEvent sparkJobMetricsEvent = new TimedRecordEvent(
+                        
KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQuerySparkJob());
+
+                setSparkJobWrapper(sparkJobMetricsEvent, 
queryExecutionMetrics.getProject(),
+                        queryId, queryExecutionMetrics.getExecutionId(),
+                        sparkJobMetricsEntry.getValue().getJobId(), 
sparkJobMetricsEntry.getValue().getStartTime(),
+                        sparkJobMetricsEntry.getValue().getEndTime(), 
sparkJobMetricsEntry.getValue().isSuccess());
+
+                long[] sparkJobMetricsList = new long[sparkMetricsNum];
+                for (Map.Entry<Integer, QuerySparkMetrics.SparkStageMetrics> 
sparkStageMetricsEntry : sparkJobMetricsEntry
+                        .getValue().getSparkStageMetricsMap().entrySet()) {
+                    RecordEvent sparkStageMetricsEvent = new TimedRecordEvent(
+                            
KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQuerySparkStage());
+                    QuerySparkMetrics.SparkStageMetrics sparkStageMetrics = 
sparkStageMetricsEntry.getValue();
+                    setStageWrapper(sparkStageMetricsEvent, 
queryExecutionMetrics.getProject(), null,
+                            queryId, queryExecutionMetrics.getExecutionId(),
+                            sparkJobMetricsEntry.getValue().getJobId(), 
sparkStageMetrics.getStageId(),
+                            sparkStageMetrics.getSubmitTime(), 
sparkStageMetrics.isSuccess());
+                    setStageMetrics(sparkStageMetricsEvent, 
sparkStageMetrics.getResultSize(),
+                            sparkStageMetrics.getExecutorDeserializeTime(),
+                            sparkStageMetrics.getExecutorDeserializeCpuTime(), 
sparkStageMetrics.getExecutorRunTime(),
+                            sparkStageMetrics.getExecutorCpuTime(), 
sparkStageMetrics.getJvmGCTime(),
+                            sparkStageMetrics.getResultSerializationTime(), 
sparkStageMetrics.getMemoryBytesSpilled(),
+                            sparkStageMetrics.getDiskBytesSpilled(), 
sparkStageMetrics.getPeakExecutionMemory());
+                    //Update spark stage level metrics
+                    
MetricsManager.getInstance().update(sparkStageMetricsEvent);
+
+                    sparkJobMetricsList[0] += 
sparkStageMetrics.getResultSize();
+                    sparkJobMetricsList[1] += 
sparkStageMetrics.getExecutorDeserializeTime();
+                    sparkJobMetricsList[2] += 
sparkStageMetrics.getExecutorDeserializeCpuTime();
+                    sparkJobMetricsList[3] += 
sparkStageMetrics.getExecutorRunTime();
+                    sparkJobMetricsList[4] += 
sparkStageMetrics.getExecutorCpuTime();
+                    sparkJobMetricsList[5] += sparkStageMetrics.getJvmGCTime();
+                    sparkJobMetricsList[6] += 
sparkStageMetrics.getResultSerializationTime();
+                    sparkJobMetricsList[7] += 
sparkStageMetrics.getMemoryBytesSpilled();
+                    sparkJobMetricsList[8] += 
sparkStageMetrics.getDiskBytesSpilled();
+                    sparkJobMetricsList[9] += 
sparkStageMetrics.getPeakExecutionMemory();
+                }
+                setSparkJobMetrics(sparkJobMetricsEvent, 
sparkJobMetricsList[0], sparkJobMetricsList[1],
+                        sparkJobMetricsList[2], sparkJobMetricsList[3], 
sparkJobMetricsList[4], sparkJobMetricsList[5],
+                        sparkJobMetricsList[6], sparkJobMetricsList[7], 
sparkJobMetricsList[8], sparkJobMetricsList[9]);
+                //Update spark job level metrics
+                MetricsManager.getInstance().update(sparkJobMetricsEvent);
+
+                for (int i = 0; i < sparkMetricsNum; i++) {
+                    queryExecutionMetricsList[i] += sparkJobMetricsList[i];
+                }
+            }
+            setSparkExecutionMetrics(queryExecutionMetricsEvent, 
queryExecutionMetrics.getExecutionDuration(),
+                    queryExecutionMetricsList[0], 
queryExecutionMetricsList[1], queryExecutionMetricsList[2],
+                    queryExecutionMetricsList[3], 
queryExecutionMetricsList[4], queryExecutionMetricsList[5],
+                    queryExecutionMetricsList[6], 
queryExecutionMetricsList[7], queryExecutionMetricsList[8],
+                    queryExecutionMetricsList[9]);
+            //Update execution level metrics
+            MetricsManager.getInstance().update(queryExecutionMetricsEvent);
+        }
+    }
+
+    private static void setQueryWrapper(RecordEvent metricsEvent, String user, 
long sqlIdCode, String queryType,
+            String queryId, String project, String exception) {
+        metricsEvent.put(QuerySparkExecutionEnum.USER.toString(), user);
+        metricsEvent.put(QuerySparkExecutionEnum.ID_CODE.toString(), 
sqlIdCode);
+        metricsEvent.put(QuerySparkExecutionEnum.TYPE.toString(), queryType);
+        metricsEvent.put(QuerySparkExecutionEnum.QUERY_ID.toString(), queryId);
+        metricsEvent.put(QuerySparkExecutionEnum.PROJECT.toString(), project);
+        metricsEvent.put(QuerySparkExecutionEnum.EXCEPTION.toString(), 
exception);
+    }
+
+    private static void setSparkExecutionWrapper(RecordEvent metricsEvent, 
String sparderName, long executionId,
+            String realizationName, int realizationType, String cuboidIds, 
long startTime, long endTime) {
+        metricsEvent.put(QuerySparkExecutionEnum.SPARDER_NAME.toString(), 
sparderName);
+        metricsEvent.put(QuerySparkExecutionEnum.EXECUTION_ID.toString(), 
executionId);
+        metricsEvent.put(QuerySparkExecutionEnum.REALIZATION.toString(), 
realizationName);
+        metricsEvent.put(QuerySparkExecutionEnum.REALIZATION_TYPE.toString(), 
realizationType);
+        metricsEvent.put(QuerySparkExecutionEnum.CUBOID_IDS.toString(), 
cuboidIds);
+        metricsEvent.put(QuerySparkExecutionEnum.START_TIME.toString(), 
startTime);
+        metricsEvent.put(QuerySparkExecutionEnum.END_TIME.toString(), endTime);
+    }
+
+    private static void setQueryMetrics(RecordEvent metricsEvent, long 
sqlDuration, long totalScanCount,
+            long totalScanBytes, long resultCount) {
+        metricsEvent.put(QuerySparkExecutionEnum.TIME_COST.toString(), 
sqlDuration);
+        metricsEvent.put(QuerySparkExecutionEnum.TOTAL_SCAN_COUNT.toString(), 
totalScanCount);
+        metricsEvent.put(QuerySparkExecutionEnum.TOTAL_SCAN_BYTES.toString(), 
totalScanBytes);
+        metricsEvent.put(QuerySparkExecutionEnum.RESULT_COUNT.toString(), 
resultCount);
+    }
+
+    private static void setSparkExecutionMetrics(RecordEvent metricsEvent, 
long executionDuration, long resultSize,
+            long executorDeserializeTime, long executorDeserializeCpuTime, 
long executorRunTime, long executorCpuTime,
+            long jvmGCTime, long resultSerializationTime, long 
memoryBytesSpilled, long diskBytesSpilled,
+            long peakExecutionMemory) {
+        
metricsEvent.put(QuerySparkExecutionEnum.EXECUTION_DURATION.toString(), 
executionDuration);
+
+        metricsEvent.put(QuerySparkExecutionEnum.RESULT_SIZE.toString(), 
resultSize);
+        
metricsEvent.put(QuerySparkExecutionEnum.EXECUTOR_DESERIALIZE_TIME.toString(), 
executorDeserializeTime);
+        
metricsEvent.put(QuerySparkExecutionEnum.EXECUTOR_DESERIALIZE_CPU_TIME.toString(),
 executorDeserializeCpuTime);
+        metricsEvent.put(QuerySparkExecutionEnum.EXECUTOR_RUN_TIME.toString(), 
executorRunTime);
+        metricsEvent.put(QuerySparkExecutionEnum.EXECUTOR_CPU_TIME.toString(), 
executorCpuTime);
+        metricsEvent.put(QuerySparkExecutionEnum.JVM_GC_TIME.toString(), 
jvmGCTime);
+        
metricsEvent.put(QuerySparkExecutionEnum.RESULT_SERIALIZATION_TIME.toString(), 
resultSerializationTime);
+        
metricsEvent.put(QuerySparkExecutionEnum.MEMORY_BYTE_SPILLED.toString(), 
memoryBytesSpilled);
+        
metricsEvent.put(QuerySparkExecutionEnum.DISK_BYTES_SPILLED.toString(), 
diskBytesSpilled);
+        
metricsEvent.put(QuerySparkExecutionEnum.PEAK_EXECUTION_MEMORY.toString(), 
peakExecutionMemory);
+    }
+
+    private static void setSparkJobMetrics(RecordEvent metricsEvent, long 
resultSize, long executorDeserializeTime,
+            long executorDeserializeCpuTime, long executorRunTime, long 
executorCpuTime, long jvmGCTime,
+            long resultSerializationTime, long memoryBytesSpilled, long 
diskBytesSpilled, long peakExecutionMemory) {
+        metricsEvent.put(QuerySparkJobEnum.RESULT_SIZE.toString(), resultSize);
+        
metricsEvent.put(QuerySparkJobEnum.EXECUTOR_DESERIALIZE_TIME.toString(), 
executorDeserializeTime);
+        
metricsEvent.put(QuerySparkJobEnum.EXECUTOR_DESERIALIZE_CPU_TIME.toString(), 
executorDeserializeCpuTime);
+        metricsEvent.put(QuerySparkJobEnum.EXECUTOR_RUN_TIME.toString(), 
executorRunTime);
+        metricsEvent.put(QuerySparkJobEnum.EXECUTOR_CPU_TIME.toString(), 
executorCpuTime);
+        metricsEvent.put(QuerySparkJobEnum.JVM_GC_TIME.toString(), jvmGCTime);
+        
metricsEvent.put(QuerySparkJobEnum.RESULT_SERIALIZATION_TIME.toString(), 
resultSerializationTime);
+        metricsEvent.put(QuerySparkJobEnum.MEMORY_BYTE_SPILLED.toString(), 
memoryBytesSpilled);
+        metricsEvent.put(QuerySparkJobEnum.DISK_BYTES_SPILLED.toString(), 
diskBytesSpilled);
+        metricsEvent.put(QuerySparkJobEnum.PEAK_EXECUTION_MEMORY.toString(), 
peakExecutionMemory);
+    }
+
+    private static void setStageMetrics(RecordEvent metricsEvent, long 
resultSize, long executorDeserializeTime,
+            long executorDeserializeCpuTime, long executorRunTime, long 
executorCpuTime, long jvmGCTime,
+            long resultSerializationTime, long memoryBytesSpilled, long 
diskBytesSpilled, long peakExecutionMemory) {
+        metricsEvent.put(QuerySparkStageEnum.RESULT_SIZE.toString(), 
resultSize);
+        
metricsEvent.put(QuerySparkStageEnum.EXECUTOR_DESERIALIZE_TIME.toString(), 
executorDeserializeTime);
+        
metricsEvent.put(QuerySparkStageEnum.EXECUTOR_DESERIALIZE_CPU_TIME.toString(), 
executorDeserializeCpuTime);
+        metricsEvent.put(QuerySparkStageEnum.EXECUTOR_RUN_TIME.toString(), 
executorRunTime);
+        metricsEvent.put(QuerySparkStageEnum.EXECUTOR_CPU_TIME.toString(), 
executorCpuTime);
+        metricsEvent.put(QuerySparkStageEnum.JVM_GC_TIME.toString(), 
jvmGCTime);
+        
metricsEvent.put(QuerySparkStageEnum.RESULT_SERIALIZATION_TIME.toString(), 
resultSerializationTime);
+        metricsEvent.put(QuerySparkStageEnum.MEMORY_BYTE_SPILLED.toString(), 
memoryBytesSpilled);
+        metricsEvent.put(QuerySparkStageEnum.DISK_BYTES_SPILLED.toString(), 
diskBytesSpilled);
+        metricsEvent.put(QuerySparkStageEnum.PEAK_EXECUTION_MEMORY.toString(), 
peakExecutionMemory);
+    }
+
+    private static void setStageWrapper(RecordEvent metricsEvent, String 
projectName, String realizationName,
+            String queryId, long executionId, int jobId, int stageId, long 
submitTime, boolean isSuccess) {
+        metricsEvent.put(QuerySparkStageEnum.PROJECT.toString(), projectName);
+        metricsEvent.put(QuerySparkStageEnum.REALIZATION.toString(), 
realizationName);
+        metricsEvent.put(QuerySparkStageEnum.QUERY_ID.toString(), queryId);
+        metricsEvent.put(QuerySparkStageEnum.EXECUTION_ID.toString(), 
executionId);
+        metricsEvent.put(QuerySparkStageEnum.JOB_ID.toString(), jobId);
+        metricsEvent.put(QuerySparkStageEnum.STAGE_ID.toString(), stageId);
+        metricsEvent.put(QuerySparkStageEnum.SUBMIT_TIME.toString(), 
submitTime);
+        metricsEvent.put(QuerySparkStageEnum.IF_SUCCESS.toString(), isSuccess);
+    }
+
+    private static void setSparkJobWrapper(RecordEvent metricsEvent, String 
projectName, String queryId,
+            long executionId, int jobId, long startTime, long endTime, boolean 
isSuccess) {
+        metricsEvent.put(QuerySparkJobEnum.PROJECT.toString(), projectName);
+        metricsEvent.put(QuerySparkJobEnum.QUERY_ID.toString(), queryId);
+        metricsEvent.put(QuerySparkJobEnum.EXECUTION_ID.toString(), 
executionId);
+        metricsEvent.put(QuerySparkJobEnum.JOB_ID.toString(), jobId);
+        metricsEvent.put(QuerySparkJobEnum.START_TIME.toString(), startTime);
+        metricsEvent.put(QuerySparkJobEnum.END_TIME.toString(), endTime);
+        metricsEvent.put(QuerySparkJobEnum.IF_SUCCESS.toString(), isSuccess);
+    }
+
+    public static class QueryExecutionMetrics implements Serializable {
+        private long sqlIdCode;
+        private String user;
+        private String queryType;
+        private String project;
+        private String exception;
+        private long executionId;
+        private String sparderName;
+        private long executionDuration;
+        private String queryId;
+        private String realization;
+        private int realizationType;
+        private String cuboidIds;
+        private long startTime;
+        private long endTime;
+        private ConcurrentMap<Integer, SparkJobMetrics> sparkJobMetricsMap;
+
+        private long sqlDuration;
+        private long totalScanCount;
+        private long totalScanBytes;
+        private int resultCount;
+
+        public String getUser() {
+            return user;
+        }
+
+        public void setUser(String user) {
+            this.user = user;
+        }
+
+        public int getResultCount() {
+            return resultCount;
+        }
+
+        public long getSqlDuration() {
+            return sqlDuration;
+        }
+
+        public long getTotalScanBytes() {
+            return totalScanBytes;
+        }
+
+        public long getTotalScanCount() {
+            return totalScanCount;
+        }
+
+        public void setResultCount(int resultCount) {
+            this.resultCount = resultCount;
+        }
+
+        public void setSqlDuration(long sqlDuration) {
+            this.sqlDuration = sqlDuration;
+        }
+
+        public void setTotalScanBytes(long totalScanBytes) {
+            this.totalScanBytes = totalScanBytes;
+        }
+
+        public void setTotalScanCount(long totalScanCount) {
+            this.totalScanCount = totalScanCount;
+        }
+
+        public String getException() {
+            return exception;
+        }
+
+        public void setException(String exception) {
+            this.exception = exception;
+        }
+
+        public void setProject(String project) {
+            this.project = project;
+        }
+
+        public String getProject() {
+            return project;
+        }
+
+        public String getQueryType() {
+            return queryType;
+        }
+
+        public long getSqlIdCode() {
+            return sqlIdCode;
+        }
+
+        public void setQueryType(String queryType) {
+            this.queryType = queryType;
+        }
+
+        public void setSqlIdCode(long sqlIdCode) {
+            this.sqlIdCode = sqlIdCode;
+        }
+
+        public long getEndTime() {
+            return endTime;
+        }
+
+        public long getStartTime() {
+            return startTime;
+        }
+
+        public void setEndTime(long endTime) {
+            this.endTime = endTime;
+        }
+
+        public void setStartTime(long startTime) {
+            this.startTime = startTime;
+        }
+
+        public void setQueryId(String queryId) {
+            this.queryId = queryId;
+        }
+
+        public String getQueryId() {
+            return queryId;
+        }
+
+        public long getExecutionDuration() {
+            return executionDuration;
+        }
+
+        public void setExecutionDuration(long executionDuration) {
+            this.executionDuration = executionDuration;
+        }
+
+        public ConcurrentMap<Integer, SparkJobMetrics> getSparkJobMetricsMap() 
{
+            return sparkJobMetricsMap;
+        }
+
+        public long getExecutionId() {
+            return executionId;
+        }
+
+        public String getSparderName() {
+            return sparderName;
+        }
+
+        public void setExecutionId(long executionId) {
+            this.executionId = executionId;
+        }
+
+        public void setSparderName(String sparderName) {
+            this.sparderName = sparderName;
+        }
+
+        public String getCuboidIds() {
+            return cuboidIds;
+        }
+
+        public void setCuboidIds(String cuboidIds) {
+            this.cuboidIds = cuboidIds;
+        }
+
+        public String getRealization() {
+            return realization;
+        }
+
+        public int getRealizationType() {
+            return realizationType;
+        }
+
+        public void setRealization(String realization) {
+            this.realization = realization;
+        }
+
+        public void setRealizationType(int realizationType) {
+            this.realizationType = realizationType;
+        }
+
+        public void setSparkJobMetricsMap(ConcurrentMap<Integer, 
SparkJobMetrics> sparkJobMetricsMap) {
+            this.sparkJobMetricsMap = sparkJobMetricsMap;
+        }
+    }
+
+    public static class SparkJobMetrics implements Serializable {
+        private long executionId;
+        private int jobId;
+        private long startTime;
+        private long endTime;
+        private boolean isSuccess;
+        private ConcurrentMap<Integer, SparkStageMetrics> sparkStageMetricsMap;
+
+        public void setStartTime(long startTime) {
+            this.startTime = startTime;
+        }
+
+        public void setEndTime(long endTime) {
+            this.endTime = endTime;
+        }
+
+        public long getStartTime() {
+            return startTime;
+        }
+
+        public long getEndTime() {
+            return endTime;
+        }
+
+        public void setExecutionId(long executionId) {
+            this.executionId = executionId;
+        }
+
+        public long getExecutionId() {
+            return executionId;
+        }
+
+        public void setSparkStageMetricsMap(ConcurrentMap<Integer, 
SparkStageMetrics> sparkStageMetricsMap) {
+            this.sparkStageMetricsMap = sparkStageMetricsMap;
+        }
+
+        public void setJobId(int jobId) {
+            this.jobId = jobId;
+        }
+
+        public void setSuccess(boolean success) {
+            isSuccess = success;
+        }
+
+        public boolean isSuccess() {
+            return isSuccess;
+        }
+
+        public ConcurrentMap<Integer, SparkStageMetrics> 
getSparkStageMetricsMap() {
+            return sparkStageMetricsMap;
+        }
+
+        public int getJobId() {
+            return jobId;
+        }
+    }
+
+    public static class SparkStageMetrics implements Serializable {
+        private int stageId;
+        private String stageType;
+        private long submitTime;
+        private long endTime;
+        private boolean isSuccess;
+        private long resultSize;
+        private long executorDeserializeTime;
+        private long executorDeserializeCpuTime;
+        private long executorRunTime;
+        private long executorCpuTime;
+        private long jvmGCTime;
+        private long resultSerializationTime;
+        private long memoryBytesSpilled;
+        private long diskBytesSpilled;
+        private long peakExecutionMemory;
+
+        public void setMetrics(long resultSize, long executorDeserializeTime, 
long executorDeserializeCpuTime,
+                long executorRunTime, long executorCpuTime, long jvmGCTime, 
long resultSerializationTime,
+                long memoryBytesSpilled, long diskBytesSpilled, long 
peakExecutionMemory) {
+            this.resultSize = resultSize;
+            this.executorDeserializeTime = executorDeserializeTime;
+            this.executorDeserializeCpuTime = executorDeserializeCpuTime;
+            this.executorRunTime = executorRunTime;
+            this.executorCpuTime = executorCpuTime;
+            this.jvmGCTime = jvmGCTime;
+            this.resultSerializationTime = resultSerializationTime;
+            this.memoryBytesSpilled = memoryBytesSpilled;
+            this.diskBytesSpilled = diskBytesSpilled;
+            this.peakExecutionMemory = peakExecutionMemory;
+        }
+
+        public long getEndTime() {
+            return endTime;
+        }
+
+        public long getSubmitTime() {
+            return submitTime;
+        }
+
+        public void setEndTime(long endTime) {
+            this.endTime = endTime;
+        }
+
+        public void setSubmitTime(long submitTime) {
+            this.submitTime = submitTime;
+        }
+
+        public boolean isSuccess() {
+            return isSuccess;
+        }
+
+        public void setSuccess(boolean success) {
+            isSuccess = success;
+        }
+
+        public void setStageType(String stageType) {
+            this.stageType = stageType;
+        }
+
+        public void setStageId(int stageId) {
+            this.stageId = stageId;
+        }
+
+        public void setResultSize(long resultSize) {
+            this.resultSize = resultSize;
+        }
+
+        public void setResultSerializationTime(long resultSerializationTime) {
+            this.resultSerializationTime = resultSerializationTime;
+        }
+
+        public void setPeakExecutionMemory(long peakExecutionMemory) {
+            this.peakExecutionMemory = peakExecutionMemory;
+        }
+
+        public void setMemoryBytesSpilled(long memoryBytesSpilled) {
+            this.memoryBytesSpilled = memoryBytesSpilled;
+        }
+
+        public void setJvmGCTime(long jvmGCTime) {
+            this.jvmGCTime = jvmGCTime;
+        }
+
+        public void setExecutorRunTime(long executorRunTime) {
+            this.executorRunTime = executorRunTime;
+        }
+
+        public void setExecutorDeserializeTime(long executorDeserializeTime) {
+            this.executorDeserializeTime = executorDeserializeTime;
+        }
+
+        public void setExecutorDeserializeCpuTime(long 
executorDeserializeCpuTime) {
+            this.executorDeserializeCpuTime = executorDeserializeCpuTime;
+        }
+
+        public void setExecutorCpuTime(long executorCpuTime) {
+            this.executorCpuTime = executorCpuTime;
+        }
+
+        public void setDiskBytesSpilled(long diskBytesSpilled) {
+            this.diskBytesSpilled = diskBytesSpilled;
+        }
+
+        public String getStageType() {
+            return stageType;
+        }
+
+        public long getResultSize() {
+            return resultSize;
+        }
+
+        public long getResultSerializationTime() {
+            return resultSerializationTime;
+        }
+
+        public long getPeakExecutionMemory() {
+            return peakExecutionMemory;
+        }
+
+        public long getMemoryBytesSpilled() {
+            return memoryBytesSpilled;
+        }
+
+        public long getJvmGCTime() {
+            return jvmGCTime;
+        }
+
+        public long getExecutorRunTime() {
+            return executorRunTime;
+        }
+
+        public long getExecutorDeserializeTime() {
+            return executorDeserializeTime;
+        }
+
+        public long getExecutorDeserializeCpuTime() {
+            return executorDeserializeCpuTime;
+        }
+
+        public long getExecutorCpuTime() {
+            return executorCpuTime;
+        }
+
+        public long getDiskBytesSpilled() {
+            return diskBytesSpilled;
+        }
+
+        public int getStageId() {
+            return stageId;
+        }
+    }
+}
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/property/QuerySparkExecutionEnum.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QuerySparkExecutionEnum.java
new file mode 100644
index 0000000..b390dd0
--- /dev/null
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QuerySparkExecutionEnum.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metrics.property;
+
+import org.apache.kylin.shaded.com.google.common.base.Strings;
+
+public enum QuerySparkExecutionEnum {
+    ID_CODE("QUERY_HASH_CODE"),
+    SQL("QUERY_SQL"),
+    PROJECT("PROJECT"),
+    TYPE("QUERY_TYPE"),
+    REALIZATION("REALIZATION"),
+    REALIZATION_TYPE("REALIZATION_TYPE"),
+    CUBOID_IDS("CUBOID_IDS"),
+    QUERY_ID("QUERY_ID"),
+    EXECUTION_ID("EXECUTION_ID"),
+    USER("KUSER"),
+    SPARDER_NAME("SPARDER_NAME"),
+    EXCEPTION("EXCEPTION"),
+    START_TIME("START_TIME"),
+    END_TIME("END_TIME"),
+
+
+    TIME_COST("QUERY_TIME_COST"),
+    TOTAL_SCAN_COUNT("TOTAL_SCAN_COUNT"),
+    TOTAL_SCAN_BYTES("TOTAL_SCAN_BYTES"),
+    RESULT_COUNT("RESULT_COUNT"),
+
+    EXECUTION_DURATION("EXECUTION_DURATION"),
+    RESULT_SIZE("RESULT_SIZE"),
+    EXECUTOR_DESERIALIZE_TIME("EXECUTOR_DESERIALIZE_TIME"),
+    EXECUTOR_DESERIALIZE_CPU_TIME("EXECUTOR_DESERIALIZE_CPU_TIME"),
+    EXECUTOR_RUN_TIME("EXECUTOR_RUN_TIME"),
+    EXECUTOR_CPU_TIME("EXECUTOR_CPU_TIME"),
+    JVM_GC_TIME("JVM_GC_TIME"),
+    RESULT_SERIALIZATION_TIME("RESULT_SERIALIZATION_TIME"),
+    MEMORY_BYTE_SPILLED("MEMORY_BYTE_SPILLED"),
+    DISK_BYTES_SPILLED("DISK_BYTES_SPILLED"),
+    PEAK_EXECUTION_MEMORY("PEAK_EXECUTION_MEMORY");
+
+    private final String propertyName;
+
+    QuerySparkExecutionEnum(String name) {
+        this.propertyName = name;
+    }
+
+    public static QuerySparkExecutionEnum getByName(String name) {
+        if (Strings.isNullOrEmpty(name)) {
+            throw new IllegalArgumentException("Name should not be empty");
+        }
+        for (QuerySparkExecutionEnum property : 
QuerySparkExecutionEnum.values()) {
+            if (property.propertyName.equalsIgnoreCase(name)) {
+                return property;
+            }
+        }
+
+        return null;
+    }
+
+    @Override
+    public String toString() {
+        return propertyName;
+    }
+}
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/property/QuerySparkJobEnum.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QuerySparkJobEnum.java
new file mode 100644
index 0000000..0f041c2
--- /dev/null
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QuerySparkJobEnum.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metrics.property;
+
+import org.apache.kylin.shaded.com.google.common.base.Strings;
+
+public enum QuerySparkJobEnum {
+    PROJECT("PROJECT"),
+    QUERY_ID("QUERY_ID"),
+    EXECUTION_ID("EXECUTION_ID"),
+    JOB_ID("JOB_ID"),
+    START_TIME("START_TIME"),
+    END_TIME("END_TIME"),
+    IF_SUCCESS("IF_SUCCESS"),
+
+    RESULT_SIZE("RESULT_SIZE"),
+    EXECUTOR_DESERIALIZE_TIME("EXECUTOR_DESERIALIZE_TIME"),
+    EXECUTOR_DESERIALIZE_CPU_TIME("EXECUTOR_DESERIALIZE_CPU_TIME"),
+    EXECUTOR_RUN_TIME("EXECUTOR_RUN_TIME"),
+    EXECUTOR_CPU_TIME("EXECUTOR_CPU_TIME"),
+    JVM_GC_TIME("JVM_GC_TIME"),
+    RESULT_SERIALIZATION_TIME("RESULT_SERIALIZATION_TIME"),
+    MEMORY_BYTE_SPILLED("MEMORY_BYTE_SPILLED"),
+    DISK_BYTES_SPILLED("DISK_BYTES_SPILLED"),
+    PEAK_EXECUTION_MEMORY("PEAK_EXECUTION_MEMORY");
+
+    private final String propertyName;
+
+    QuerySparkJobEnum(String name) {
+        this.propertyName = name;
+    }
+
+    public static QuerySparkJobEnum getByName(String name) {
+        if (Strings.isNullOrEmpty(name)) {
+            throw new IllegalArgumentException("Name should not be empty");
+        }
+        for (QuerySparkJobEnum property : QuerySparkJobEnum.values()) {
+            if (property.propertyName.equalsIgnoreCase(name)) {
+                return property;
+            }
+        }
+
+        return null;
+    }
+
+    @Override
+    public String toString() {
+        return propertyName;
+    }
+}
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/property/QuerySparkStageEnum.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QuerySparkStageEnum.java
new file mode 100644
index 0000000..dcb0f42
--- /dev/null
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QuerySparkStageEnum.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metrics.property;
+
+import org.apache.kylin.shaded.com.google.common.base.Strings;
+
+/**
+ * Definition of Metrics dimension and measure for Spark stage
+ */
+public enum QuerySparkStageEnum {
+    PROJECT("PROJECT"),
+    QUERY_ID("QUERY_ID"),
+    EXECUTION_ID("EXECUTION_ID"),
+    JOB_ID("JOB_ID"),
+    STAGE_ID("STAGE_ID"),
+    SUBMIT_TIME("SUBMIT_TIME"),
+    REALIZATION("REALIZATION"),
+    CUBOID_ID("CUBOID_NAME"),
+    IF_SUCCESS("IF_SUCCESS"),
+
+    RESULT_SIZE("RESULT_SIZE"),
+    EXECUTOR_DESERIALIZE_TIME("EXECUTOR_DESERIALIZE_TIME"),
+    EXECUTOR_DESERIALIZE_CPU_TIME("EXECUTOR_DESERIALIZE_CPU_TIME"),
+    EXECUTOR_RUN_TIME("EXECUTOR_RUN_TIME"),
+    EXECUTOR_CPU_TIME("EXECUTOR_CPU_TIME"),
+    JVM_GC_TIME("JVM_GC_TIME"),
+    RESULT_SERIALIZATION_TIME("RESULT_SERIALIZATION_TIME"),
+    MEMORY_BYTE_SPILLED("MEMORY_BYTE_SPILLED"),
+    DISK_BYTES_SPILLED("DISK_BYTES_SPILLED"),
+    PEAK_EXECUTION_MEMORY("PEAK_EXECUTION_MEMORY");
+
+    private final String propertyName;
+
+    QuerySparkStageEnum(String name) {
+        this.propertyName = name;
+    }
+
+    public static QuerySparkStageEnum getByName(String name) {
+        if (Strings.isNullOrEmpty(name)) {
+            throw new IllegalArgumentException("Name should not be empty");
+        }
+        for (QuerySparkStageEnum property : QuerySparkStageEnum.values()) {
+            if (property.propertyName.equalsIgnoreCase(name)) {
+                return property;
+            }
+        }
+
+        return null;
+    }
+
+    @Override
+    public String toString() {
+        return propertyName;
+    }
+}
diff --git 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
index 9e89a62..0d25dba 100644
--- 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
+++ 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
@@ -37,6 +37,7 @@ import org.apache.kylin.query.monitor.SparderContextCanary
 import org.apache.kylin.spark.classloader.ClassLoaderUtils
 import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
 import org.apache.spark.sql.execution.datasource.KylinSourceStrategy
+import org.apache.spark.sql.metrics.SparderMetricsListener
 import org.apache.spark.utils.YarnInfoFetcherUtils
 
 // scalastyle:off
@@ -150,6 +151,11 @@ object SparderContext extends Logging {
                     .enableHiveSupport()
                     .getOrCreateKylinSession()
               }
+              if (kylinConf.isKylinMetricsReporterForQueryEnabled) {
+                val appStatusListener = new SparderMetricsListener()
+                sparkSession.sparkContext.addSparkListener(appStatusListener)
+                logInfo("Query metrics reporter is enabled, sparder metrics 
listener is added.")
+              }
               spark = sparkSession
               val appid = sparkSession.sparkContext.applicationId
               // write application id to file 'sparkappid'
diff --git 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/SparderMetricsListener.scala
 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/SparderMetricsListener.scala
new file mode 100644
index 0000000..6237235
--- /dev/null
+++ 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/SparderMetricsListener.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.metrics
+
+import org.apache.kylin.metrics.QuerySparkMetrics
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler._
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, 
SparkListenerSQLExecutionStart}
+
+class SparderMetricsListener() extends SparkListener with Logging {
+
+  var stageJobMap: Map[Int, Int] = Map()
+  var jobExecutionMap: Map[Int, QueryInformation] = Map()
+  var executionInformationMap: Map[Long, ExecutionInformation] = Map()
+
+  val queryExecutionMetrics = QuerySparkMetrics.getInstance()
+
+  override def onJobStart(event: SparkListenerJobStart): Unit = {
+    val executionIdString = 
event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
+    val sparderName = event.properties.getProperty("spark.app.name")
+    val kylinQueryId = event.properties.getProperty("kylin.query.id")
+
+    if (executionIdString == null || kylinQueryId == null) {
+      logInfo(s"The job ${event.jobId} is not a query job.")
+      return
+    }
+
+    val executionId = executionIdString.toLong
+
+    if (executionInformationMap.apply(executionId).sparderName == null) {
+      val executionInformation = new ExecutionInformation(kylinQueryId,
+        executionInformationMap.apply(executionId).executionStartTime, 
sparderName)
+      executionInformationMap += (executionId -> executionInformation)
+    }
+
+    jobExecutionMap += (event.jobId -> new QueryInformation(kylinQueryId, 
executionId))
+
+    val stages = event.stageInfos.iterator
+    while (stages.hasNext) {
+      val stage: StageInfo = stages.next()
+      stageJobMap += (stage.stageId -> event.jobId)
+    }
+
+    queryExecutionMetrics.onJobStart(kylinQueryId, sparderName, executionId,
+      executionInformationMap.apply(executionId).executionStartTime, 
event.jobId, event.time)
+  }
+
+  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
+    if (jobExecutionMap.contains(event.jobId)) {
+      val isSuccess = event.jobResult match {
+        case JobSucceeded => true
+        case _ => false
+      }
+      
queryExecutionMetrics.updateSparkJobMetrics(jobExecutionMap.apply(event.jobId).queryId,
 event.jobId, event.time,
+        isSuccess)
+      logInfo(s"The job ${event.jobId} has completed and the relevant metrics 
are updated to the cache")
+      jobExecutionMap -= event.jobId
+    }
+  }
+
+  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
+    val queryId = event.properties.getProperty("kylin.query.id")
+    val stageId = event.stageInfo.stageId
+
+    if (stageJobMap.contains(stageId)) {
+      val submitTime = event.stageInfo.submissionTime match {
+        case Some(x) => x
+        case None => -1
+      }
+      queryExecutionMetrics.onSparkStageStart(queryId, 
stageJobMap.apply(stageId), stageId, event.stageInfo.name, submitTime)
+    }
+  }
+
+  override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
+    val stageInfo = event.stageInfo
+    if (stageJobMap.contains(stageInfo.stageId) && 
jobExecutionMap.contains(stageJobMap.apply(stageInfo.stageId))) {
+      val isSuccess = stageInfo.getStatusString match {
+        case "succeeded" => true
+        case _ => false
+      }
+      val stageMetrics = stageInfo.taskMetrics
+      val sparkStageMetrics = new QuerySparkMetrics.SparkStageMetrics
+      sparkStageMetrics.setMetrics(stageMetrics.resultSize, 
stageMetrics.executorDeserializeCpuTime,
+        stageMetrics.executorDeserializeCpuTime, stageMetrics.executorRunTime, 
stageMetrics.executorCpuTime,
+        stageMetrics.jvmGCTime, stageMetrics.resultSerializationTime,
+        stageMetrics.memoryBytesSpilled, stageMetrics.diskBytesSpilled, 
stageMetrics.peakExecutionMemory)
+      
queryExecutionMetrics.updateSparkStageMetrics(jobExecutionMap.apply(stageJobMap.apply(stageInfo.stageId)).queryId,
+        stageJobMap.apply(stageInfo.stageId), stageInfo.stageId, isSuccess, 
sparkStageMetrics)
+      stageJobMap -= stageInfo.stageId
+
+      logInfo(s"The stage ${event.stageInfo.stageId} has completed and the 
relevant metrics are updated to the cache")
+    }
+  }
+
+  override def onOtherEvent(event: SparkListenerEvent): Unit = {
+    event match {
+      case e: SparkListenerSQLExecutionStart => onQueryExecutionStart(e)
+      case e: SparkListenerSQLExecutionEnd => onQueryExecutionEnd(e)
+      case _ => // Ignore
+    }
+  }
+
+  private def onQueryExecutionStart(event: SparkListenerSQLExecutionStart): 
Unit = {
+    executionInformationMap += (event.executionId -> new 
ExecutionInformation(null, event.time, null))
+  }
+
+  private def onQueryExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = 
{
+    val executionInformation = executionInformationMap.apply(event.executionId)
+    queryExecutionMetrics.updateExecutionMetrics(executionInformation.queryId, 
event.time)
+    executionInformationMap -= event.executionId
+    logInfo(s"QueryExecution ${event.executionId} is completed at 
${event.time} " +
+      s"and the relevant metrics are updated to the cache")
+  }
+}
+
+// ============================
+
+class ExecutionInformation(
+                            var queryId: String,
+                            var executionStartTime: Long,
+                            var sparderName: String
+                          )
+
+class QueryInformation(
+                        val queryId: String,
+                        val executionId: Long
+                      )
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
 
b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
index f51bf08..7a8f2d7 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
@@ -20,7 +20,6 @@ package org.apache.kylin.rest.metrics;
 
 import java.nio.charset.Charset;
 import java.util.Locale;
-import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import javax.annotation.concurrent.ThreadSafe;
@@ -28,23 +27,15 @@ import javax.annotation.concurrent.ThreadSafe;
 import org.apache.hadoop.metrics2.MetricsException;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.QueryContext;
-import org.apache.kylin.common.QueryContextFacade;
-import org.apache.kylin.metrics.MetricsManager;
-import org.apache.kylin.metrics.lib.impl.RecordEvent;
-import org.apache.kylin.metrics.lib.impl.TimedRecordEvent;
-import org.apache.kylin.metrics.property.QueryCubePropertyEnum;
-import org.apache.kylin.metrics.property.QueryPropertyEnum;
-import org.apache.kylin.metrics.property.QueryRPCPropertyEnum;
-import org.apache.kylin.query.enumerator.OLAPQuery;
+import org.apache.kylin.metrics.QuerySparkMetrics;
 import org.apache.kylin.rest.request.SQLRequest;
 import org.apache.kylin.rest.response.SQLResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.security.core.context.SecurityContextHolder;
 
 import org.apache.kylin.shaded.com.google.common.hash.HashFunction;
 import org.apache.kylin.shaded.com.google.common.hash.Hashing;
+import org.springframework.security.core.context.SecurityContextHolder;
 
 /**
  * The entrance of metrics features.
@@ -70,9 +61,9 @@ public class QueryMetricsFacade {
         return hashFunc.hashString(sql, Charset.forName("UTF-8")).asLong();
     }
 
-    public static void updateMetrics(SQLRequest sqlRequest, SQLResponse 
sqlResponse) {
+    public static void updateMetrics(String queryId, SQLRequest sqlRequest, 
SQLResponse sqlResponse) {
         updateMetricsToLocal(sqlRequest, sqlResponse);
-        updateMetricsToReservoir(sqlRequest, sqlResponse);
+        updateMetricsToCache(queryId, sqlRequest, sqlResponse);
     }
 
     private static void updateMetricsToLocal(SQLRequest sqlRequest, 
SQLResponse sqlResponse) {
@@ -89,71 +80,27 @@ public class QueryMetricsFacade {
         update(getQueryMetrics(cubeMetricName), sqlResponse);
     }
 
-    /**
-     * report query related metrics
-     */
-    private static void updateMetricsToReservoir(SQLRequest sqlRequest, 
SQLResponse sqlResponse) {
-        if 
(!KylinConfig.getInstanceFromEnv().isKylinMetricsReporterForQueryEnabled()) {
-            return;
-        }
+    private static void updateMetricsToCache(String queryId, SQLRequest 
sqlRequest, SQLResponse sqlResponse) {
         String user = 
SecurityContextHolder.getContext().getAuthentication().getName();
         if (user == null) {
             user = "unknown";
         }
-        for (QueryContext.RPCStatistics entry : 
QueryContextFacade.current().getRpcStatisticsList()) {
-            RecordEvent rpcMetricsEvent = new TimedRecordEvent(
-                    
KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryRpcCall());
-            setRPCWrapper(rpcMetricsEvent, //
-                    norm(sqlRequest.getProject()), entry.getRealizationName(), 
entry.getRpcServer(),
-                    entry.getException());
-            setRPCStats(rpcMetricsEvent, //
-                    entry.getCallTimeMs(), entry.getSkippedRows(), 
entry.getScannedRows(), entry.getReturnedRows(),
-                    entry.getAggregatedRows());
-            //For update rpc level related metrics
-            MetricsManager.getInstance().update(rpcMetricsEvent);
-        }
-        for (QueryContext.CubeSegmentStatisticsResult contextEntry : 
sqlResponse.getCubeSegmentStatisticsList()) {
-            RecordEvent queryMetricsEvent = new TimedRecordEvent(
-                    
KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQuery());
-            setQueryWrapper(queryMetricsEvent, //
-                    user, sqlRequest.getSql(), 
sqlResponse.isStorageCacheUsed() ? "CACHE" : contextEntry.getQueryType(),
-                    norm(sqlRequest.getProject()), 
contextEntry.getRealization(), contextEntry.getRealizationType(),
-                    sqlResponse.getThrowable());
-
-            long totalStorageReturnCount = 0L;
-            if 
(contextEntry.getQueryType().equalsIgnoreCase(OLAPQuery.EnumeratorTypeEnum.OLAP.name()))
 {
-                for (Map<String, QueryContext.CubeSegmentStatistics> cubeEntry 
: contextEntry.getCubeSegmentStatisticsMap()
-                        .values()) {
-                    for (QueryContext.CubeSegmentStatistics segmentEntry : 
cubeEntry.values()) {
-                        RecordEvent cubeSegmentMetricsEvent = new 
TimedRecordEvent(
-                                
KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryCube());
 
-                        setCubeWrapper(cubeSegmentMetricsEvent, //
-                                norm(sqlRequest.getProject()), 
segmentEntry.getCubeName(), segmentEntry.getSegmentName(),
-                                segmentEntry.getSourceCuboidId(), 
segmentEntry.getTargetCuboidId(),
-                                segmentEntry.getFilterMask());
-
-                        setCubeStats(cubeSegmentMetricsEvent, //
-                                segmentEntry.getCallCount(), 
segmentEntry.getCallTimeSum(), segmentEntry.getCallTimeMax(),
-                                segmentEntry.getStorageSkippedRows(), 
segmentEntry.getStorageScannedRows(),
-                                segmentEntry.getStorageReturnedRows(), 
segmentEntry.getStorageAggregatedRows(),
-                                segmentEntry.isIfSuccess(), 1.0 / 
cubeEntry.size());
-
-                        totalStorageReturnCount += 
segmentEntry.getStorageReturnedRows();
-                        //For update cube segment level related query metrics
-                        
MetricsManager.getInstance().update(cubeSegmentMetricsEvent);
-                    }
-                }
-            } else {
-                if (!sqlResponse.getIsException()) {
-                    totalStorageReturnCount = sqlResponse.getResults().size();
-                }
-            }
-            setQueryStats(queryMetricsEvent, //
-                    sqlResponse.getDuration(), sqlResponse.getResults() == 
null ? 0 : sqlResponse.getResults().size(),
-                    totalStorageReturnCount);
-            //For update query level metrics
-            MetricsManager.getInstance().update(queryMetricsEvent);
+        QuerySparkMetrics.QueryExecutionMetrics queryExecutionMetrics = 
QuerySparkMetrics.getInstance()
+                .getQueryExecutionMetricsMap().getIfPresent(queryId);
+        if (queryExecutionMetrics != null) {
+            queryExecutionMetrics.setUser(user);
+            
queryExecutionMetrics.setSqlIdCode(getSqlHashCode(sqlRequest.getSql()));
+            queryExecutionMetrics.setProject(norm(sqlRequest.getProject()));
+            
queryExecutionMetrics.setQueryType(sqlResponse.isStorageCacheUsed() ? "CACHE" : 
"PARQUET");
+
+            queryExecutionMetrics.setSqlDuration(sqlResponse.getDuration());
+            
queryExecutionMetrics.setTotalScanCount(sqlResponse.getTotalScanCount());
+            
queryExecutionMetrics.setTotalScanBytes(sqlResponse.getTotalScanBytes());
+            queryExecutionMetrics.setResultCount(sqlResponse.getResults() == 
null ? 0 : sqlResponse.getResults().size());
+
+            queryExecutionMetrics.setException(sqlResponse.getThrowable() == 
null ? "NULL" :
+                    sqlResponse.getThrowable().getClass().getName());
         }
     }
 
@@ -161,77 +108,6 @@ public class QueryMetricsFacade {
         return project.toUpperCase(Locale.ROOT);
     }
 
-    private static void setRPCWrapper(RecordEvent metricsEvent, String 
projectName, String realizationName,
-            String rpcServer, Throwable throwable) {
-        metricsEvent.put(QueryRPCPropertyEnum.PROJECT.toString(), projectName);
-        metricsEvent.put(QueryRPCPropertyEnum.REALIZATION.toString(), 
realizationName);
-        metricsEvent.put(QueryRPCPropertyEnum.RPC_SERVER.toString(), 
rpcServer);
-        metricsEvent.put(QueryRPCPropertyEnum.EXCEPTION.toString(),
-                throwable == null ? "NULL" : throwable.getClass().getName());
-    }
-
-    private static void setRPCStats(RecordEvent metricsEvent, long callTimeMs, 
long skipCount, long scanCount,
-            long returnCount, long aggrCount) {
-        metricsEvent.put(QueryRPCPropertyEnum.CALL_TIME.toString(), 
callTimeMs);
-        metricsEvent.put(QueryRPCPropertyEnum.SKIP_COUNT.toString(), 
skipCount); //Number of skips on region servers based on region meta or fuzzy 
filter
-        metricsEvent.put(QueryRPCPropertyEnum.SCAN_COUNT.toString(), 
scanCount); //Count scanned by region server
-        metricsEvent.put(QueryRPCPropertyEnum.RETURN_COUNT.toString(), 
returnCount);//Count returned by region server
-        metricsEvent.put(QueryRPCPropertyEnum.AGGR_FILTER_COUNT.toString(), 
scanCount - returnCount); //Count filtered & aggregated by coprocessor
-        metricsEvent.put(QueryRPCPropertyEnum.AGGR_COUNT.toString(), 
aggrCount); //Count aggregated by coprocessor
-    }
-
-    private static void setCubeWrapper(RecordEvent metricsEvent, String 
projectName, String cubeName,
-            String segmentName, long sourceCuboidId, long targetCuboidId, long 
filterMask) {
-        metricsEvent.put(QueryCubePropertyEnum.PROJECT.toString(), 
projectName);
-        metricsEvent.put(QueryCubePropertyEnum.CUBE.toString(), cubeName);
-        metricsEvent.put(QueryCubePropertyEnum.SEGMENT.toString(), 
segmentName);
-        metricsEvent.put(QueryCubePropertyEnum.CUBOID_SOURCE.toString(), 
sourceCuboidId);
-        metricsEvent.put(QueryCubePropertyEnum.CUBOID_TARGET.toString(), 
targetCuboidId);
-        metricsEvent.put(QueryCubePropertyEnum.IF_MATCH.toString(), 
sourceCuboidId == targetCuboidId);
-        metricsEvent.put(QueryCubePropertyEnum.FILTER_MASK.toString(), 
filterMask);
-    }
-
-    private static void setCubeStats(RecordEvent metricsEvent, long callCount, 
long callTimeSum, long callTimeMax,
-            long skipCount, long scanCount, long returnCount, long aggrCount, 
boolean ifSuccess, double weightPerHit) {
-        metricsEvent.put(QueryCubePropertyEnum.CALL_COUNT.toString(), 
callCount);
-        metricsEvent.put(QueryCubePropertyEnum.TIME_SUM.toString(), 
callTimeSum);
-        metricsEvent.put(QueryCubePropertyEnum.TIME_MAX.toString(), 
callTimeMax);
-        metricsEvent.put(QueryCubePropertyEnum.SKIP_COUNT.toString(), 
skipCount);
-        metricsEvent.put(QueryCubePropertyEnum.SCAN_COUNT.toString(), 
scanCount);
-        metricsEvent.put(QueryCubePropertyEnum.RETURN_COUNT.toString(), 
returnCount);
-        metricsEvent.put(QueryCubePropertyEnum.AGGR_FILTER_COUNT.toString(), 
scanCount - returnCount);
-        metricsEvent.put(QueryCubePropertyEnum.AGGR_COUNT.toString(), 
aggrCount);
-        metricsEvent.put(QueryCubePropertyEnum.IF_SUCCESS.toString(), 
ifSuccess);
-        metricsEvent.put(QueryCubePropertyEnum.WEIGHT_PER_HIT.toString(), 
weightPerHit);
-    }
-
-    private static void setQueryWrapper(RecordEvent metricsEvent, String user, 
String sql, String queryType,
-            String projectName, String realizationName, int realizationType, 
Throwable throwable) {
-        metricsEvent.put(QueryPropertyEnum.USER.toString(), user);
-        metricsEvent.put(QueryPropertyEnum.ID_CODE.toString(), 
getSqlHashCode(sql));
-        metricsEvent.put(QueryPropertyEnum.SQL.toString(), sql);
-        metricsEvent.put(QueryPropertyEnum.TYPE.toString(), queryType);
-        metricsEvent.put(QueryPropertyEnum.PROJECT.toString(), projectName);
-        metricsEvent.put(QueryPropertyEnum.REALIZATION.toString(), 
realizationName);
-        metricsEvent.put(QueryPropertyEnum.REALIZATION_TYPE.toString(), 
realizationType);
-        metricsEvent.put(QueryPropertyEnum.EXCEPTION.toString(),
-                throwable == null ? "NULL" : throwable.getClass().getName());
-    }
-
-    private static void setQueryStats(RecordEvent metricsEvent, long 
callTimeMs, long returnCountByCalcite,
-            long returnCountByStorage) {
-        metricsEvent.put(QueryPropertyEnum.TIME_COST.toString(), callTimeMs);
-        metricsEvent.put(QueryPropertyEnum.CALCITE_RETURN_COUNT.toString(), 
returnCountByCalcite);
-        metricsEvent.put(QueryPropertyEnum.STORAGE_RETURN_COUNT.toString(), 
returnCountByStorage);
-        long countAggrAndFilter = returnCountByStorage - returnCountByCalcite;
-        if (countAggrAndFilter < 0) {
-            countAggrAndFilter = 0;
-            logger.warn(returnCountByStorage + " rows returned by storage less 
than " + returnCountByCalcite
-                    + " rows returned by calcite");
-        }
-        metricsEvent.put(QueryPropertyEnum.AGGR_FILTER_COUNT.toString(), 
countAggrAndFilter);
-    }
-
     private static void update(QueryMetrics queryMetrics, SQLResponse 
sqlResponse) {
         try {
             incrQueryCount(queryMetrics, sqlResponse);
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 63cb4d8..6b60ab8 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -1042,7 +1042,7 @@ public class CubeService extends BasicService implements 
InitializingBean {
         String cuboidColumn = isCuboidSource ? 
QueryCubePropertyEnum.CUBOID_SOURCE.toString()
                 : QueryCubePropertyEnum.CUBOID_TARGET.toString();
         String hitMeasure = QueryCubePropertyEnum.WEIGHT_PER_HIT.toString();
-        String table = 
getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQueryCube());
+        String table = 
getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQuerySparkJob());
         String sql = "select " + cuboidColumn + ", sum(" + hitMeasure + ")" //
                 + " from " + table//
                 + " where " + QueryCubePropertyEnum.CUBE.toString() + " = ?" //
@@ -1057,7 +1057,7 @@ public class CubeService extends BasicService implements 
InitializingBean {
         String cuboidTgt = QueryCubePropertyEnum.CUBOID_TARGET.toString();
         String aggCount = QueryCubePropertyEnum.AGGR_COUNT.toString();
         String returnCount = QueryCubePropertyEnum.RETURN_COUNT.toString();
-        String table = 
getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQueryCube());
+        String table = 
getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQuerySparkJob());
         String sql = "select " + cuboidSource + ", " + cuboidTgt + ", avg(" + 
aggCount + "), avg(" + returnCount + ")"//
                 + " from " + table //
                 + " where " + QueryCubePropertyEnum.CUBE.toString() + " = ?" //
@@ -1070,7 +1070,7 @@ public class CubeService extends BasicService implements 
InitializingBean {
     public Map<Long, Long> getCuboidQueryMatchCount(String cubeName) {
         String cuboidSource = QueryCubePropertyEnum.CUBOID_SOURCE.toString();
         String hitMeasure = QueryCubePropertyEnum.WEIGHT_PER_HIT.toString();
-        String table = 
getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQueryCube());
+        String table = 
getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQuerySparkJob());
         String sql = "select " + cuboidSource + ", sum(" + hitMeasure + ")" //
                 + " from " + table //
                 + " where " + QueryCubePropertyEnum.CUBE.toString() + " = ?" //
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java
index f622b89..768876f 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java
@@ -31,7 +31,8 @@ import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.metrics.MetricsManager;
 import org.apache.kylin.metrics.lib.impl.TimePropertyEnum;
 import org.apache.kylin.metrics.property.JobPropertyEnum;
-import org.apache.kylin.metrics.property.QueryPropertyEnum;
+import org.apache.kylin.metrics.property.QuerySparkExecutionEnum;
+import org.apache.kylin.metrics.property.QuerySparkExecutionEnum;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.exception.BadRequestException;
 import org.apache.kylin.rest.request.PrepareSqlRequest;
@@ -119,7 +120,7 @@ public class DashboardService extends BasicService {
         Map<String, String> filterMap = getBaseFilterMap(CategoryEnum.QUERY, 
projectName, startTime, endTime);
         filterMap.putAll(getCubeFilterMap(CategoryEnum.QUERY, cubeName));
         return createPrepareSqlRequest(null, metrics,
-                
getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQuery()),
 filterMap);
+                
getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQueryExecution()),
 filterMap);
     };
 
     public PrepareSqlRequest getJobMetricsSQLRequest(String startTime, String 
endTime, String projectName,
@@ -143,7 +144,7 @@ public class DashboardService extends BasicService {
             if (categoryEnum == CategoryEnum.QUERY) {
                 dimensionSQL = new String[] { 
QueryDimensionEnum.valueOf(dimension).toSQL() };
                 metricSQL = new String[] { 
QueryMetricEnum.valueOf(metric).toSQL() };
-                table = 
getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQuery());
+                table = 
getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQueryExecution());
             } else if (categoryEnum == CategoryEnum.JOB) {
                 dimensionSQL = new String[] { 
JobDimensionEnum.valueOf(dimension).toSQL() };
                 metricSQL = new String[] { 
JobMetricEnum.valueOf(metric).toSQL() };
@@ -217,10 +218,10 @@ public class DashboardService extends BasicService {
         HashMap<String, String> filterMap = new HashMap<>();
 
         if (category == CategoryEnum.QUERY) {
-            filterMap.put(QueryPropertyEnum.EXCEPTION.toString() + " = ?", 
"NULL");
+            filterMap.put(QuerySparkExecutionEnum.EXCEPTION.toString() + " = 
?", "NULL");
 
             if (!Strings.isNullOrEmpty(cubeName)) {
-                filterMap.put(QueryPropertyEnum.REALIZATION + " = ?", 
cubeName);
+                filterMap.put(QuerySparkExecutionEnum.REALIZATION + " = ?", 
cubeName);
             }
         } else if (category == CategoryEnum.JOB && 
!Strings.isNullOrEmpty(cubeName)) {
             HybridInstance hybridInstance = 
getHybridManager().getHybridInstance(cubeName);
@@ -299,8 +300,8 @@ public class DashboardService extends BasicService {
     }
 
     private enum QueryDimensionEnum {
-        PROJECT(QueryPropertyEnum.PROJECT.toString()), //
-        CUBE(QueryPropertyEnum.REALIZATION.toString()), //
+        PROJECT(QuerySparkExecutionEnum.PROJECT.toString()), //
+        CUBE(QuerySparkExecutionEnum.REALIZATION.toString()), //
         DAY(TimePropertyEnum.DAY_DATE.toString()), //
         WEEK(TimePropertyEnum.WEEK_BEGIN_DATE.toString()), //
         MONTH(TimePropertyEnum.MONTH.toString());
@@ -336,9 +337,9 @@ public class DashboardService extends BasicService {
 
     private enum QueryMetricEnum {
         QUERY_COUNT("count(*)"), //
-        AVG_QUERY_LATENCY("avg(" + QueryPropertyEnum.TIME_COST.toString() + 
")"), //
-        MAX_QUERY_LATENCY("max(" + QueryPropertyEnum.TIME_COST.toString() + 
")"), //
-        MIN_QUERY_LATENCY("min(" + QueryPropertyEnum.TIME_COST.toString() + 
")");
+        AVG_QUERY_LATENCY("avg(" + 
QuerySparkExecutionEnum.TIME_COST.toString() + ")"), //
+        MAX_QUERY_LATENCY("max(" + 
QuerySparkExecutionEnum.TIME_COST.toString() + ")"), //
+        MIN_QUERY_LATENCY("min(" + 
QuerySparkExecutionEnum.TIME_COST.toString() + ")");
 
         private final String sql;
 
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 855174b..f020d6b 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -68,6 +68,7 @@ import 
org.apache.kylin.cache.cachemanager.MemcachedCacheManager;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.QueryContextFacade;
+import org.apache.kylin.metrics.QuerySparkMetrics;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
 import org.apache.kylin.common.persistence.ResourceStore;
@@ -120,6 +121,7 @@ import org.apache.kylin.rest.util.SQLResponseSignatureUtil;
 import org.apache.kylin.rest.util.TableauInterceptor;
 import org.apache.kylin.storage.hybrid.HybridInstance;
 import org.apache.kylin.storage.hybrid.HybridManager;
+import org.apache.spark.sql.SparderContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -468,9 +470,14 @@ public class QueryService extends BasicService {
             }
 
             sqlResponse.setDuration(queryContext.getAccumulatedMillis());
+            if 
(QuerySparkMetrics.getInstance().getQueryExecutionMetrics(queryContext.getQueryId())
 != null) {
+                String sqlTraceUrl = SparderContext.appMasterTrackURL() + 
"/SQL/execution/?id=" +
+                        
QuerySparkMetrics.getInstance().getQueryExecutionMetrics(queryContext.getQueryId()).getExecutionId();
+                sqlResponse.setTraceUrl(sqlTraceUrl);
+            }
             logQuery(queryContext.getQueryId(), sqlRequest, sqlResponse);
             try {
-                recordMetric(sqlRequest, sqlResponse);
+                recordMetric(queryContext.getQueryId(), sqlRequest, 
sqlResponse);
             } catch (Throwable th) {
                 logger.warn("Write metric error.", th);
             }
@@ -585,8 +592,8 @@ public class QueryService extends BasicService {
                 checkCondition(!BackdoorToggles.getDisableCache(), "query 
cache disabled in BackdoorToggles");
     }
 
-    protected void recordMetric(SQLRequest sqlRequest, SQLResponse 
sqlResponse) throws UnknownHostException {
-        QueryMetricsFacade.updateMetrics(sqlRequest, sqlResponse);
+    protected void recordMetric(String queryId, SQLRequest sqlRequest, 
SQLResponse sqlResponse) throws UnknownHostException {
+        QueryMetricsFacade.updateMetrics(queryId, sqlRequest, sqlResponse);
         QueryMetrics2Facade.updateMetrics(sqlRequest, sqlResponse);
     }
 
@@ -1200,7 +1207,8 @@ public class QueryService extends BasicService {
 
                     realizations.add(realizationName);
                 }
-                queryContext.setContextRealization(ctx.id, realizationName, 
realizationType);
+                
QuerySparkMetrics.getInstance().setQueryRealization(queryContext.getQueryId(), 
realizationName,
+                        realizationType, cuboidIdsSb.toString());
             }
 
 
diff --git 
a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java 
b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
index 10e5d8b..fbacc78 100644
--- a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
@@ -71,7 +71,7 @@ public class QueryMetricsTest extends ServiceTestBase {
         sqlResponse.setResults(results);
         sqlResponse.setStorageCacheUsed(true);
 
-        QueryMetricsFacade.updateMetrics(sqlRequest, sqlResponse);
+        QueryMetricsFacade.updateMetrics("", sqlRequest, sqlResponse);
 
         Thread.sleep(2000);
 
@@ -100,7 +100,7 @@ public class QueryMetricsTest extends ServiceTestBase {
         sqlResponse2.setCube("test_cube");
         sqlResponse2.setIsException(true);
 
-        QueryMetricsFacade.updateMetrics(sqlRequest, sqlResponse2);
+        QueryMetricsFacade.updateMetrics("", sqlRequest, sqlResponse2);
 
         Thread.sleep(2000);
 
@@ -146,7 +146,7 @@ public class QueryMetricsTest extends ServiceTestBase {
 
         
sqlResponse.setCubeSegmentStatisticsList(context.getCubeSegmentStatisticsResultList());
 
-        QueryMetricsFacade.updateMetrics(sqlRequest, sqlResponse);
+        QueryMetricsFacade.updateMetrics("", sqlRequest, sqlResponse);
 
         Thread.sleep(2000);
 
diff --git 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
index 729dabb..2d82e02 100644
--- 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
+++ 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
@@ -43,10 +43,9 @@ import org.apache.kylin.metadata.model.ParameterDesc;
 import org.apache.kylin.metrics.lib.impl.RecordEvent;
 import org.apache.kylin.metrics.lib.impl.TimePropertyEnum;
 import org.apache.kylin.metrics.property.JobPropertyEnum;
-import org.apache.kylin.metrics.property.QueryCubePropertyEnum;
-import org.apache.kylin.metrics.property.QueryPropertyEnum;
-import org.apache.kylin.metrics.property.QueryRPCPropertyEnum;
-
+import org.apache.kylin.metrics.property.QuerySparkExecutionEnum;
+import org.apache.kylin.metrics.property.QuerySparkJobEnum;
+import org.apache.kylin.metrics.property.QuerySparkStageEnum;
 import org.apache.kylin.shaded.com.google.common.collect.Lists;
 import org.apache.kylin.shaded.com.google.common.collect.Maps;
 import org.apache.kylin.shaded.com.google.common.collect.Sets;
@@ -54,11 +53,11 @@ import 
org.apache.kylin.tool.metrics.systemcube.def.MetricsSinkDesc;
 
 public class CubeDescCreator {
 
-    public static CubeDesc generateKylinCubeDescForMetricsQuery(KylinConfig 
config, MetricsSinkDesc sinkDesc) {
-        String tableName = 
sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQuery());
+    public static CubeDesc 
generateKylinCubeDescForMetricsQueryExecution(KylinConfig config, 
MetricsSinkDesc sinkDesc) {
+        String tableName = 
sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQueryExecution());
 
         //Set for dimensions
-        List<String> dimensions = ModelCreator.getDimensionsForMetricsQuery();
+        List<String> dimensions = 
ModelCreator.getDimensionsForMetricsQueryExecution();
         dimensions.remove(TimePropertyEnum.DAY_TIME.toString());
         dimensions.remove(RecordEvent.RecordReserveKeyEnum.TIME.toString());
 
@@ -68,39 +67,49 @@ public class CubeDescCreator {
         }
 
         //Set for measures
-        List<String> measures = ModelCreator.getMeasuresForMetricsQuery();
-        measures.remove(QueryPropertyEnum.ID_CODE.toString());
+        List<String> measures = 
ModelCreator.getMeasuresForMetricsQueryExecution();
+        measures.remove(QuerySparkExecutionEnum.ID_CODE.toString());
         List<MeasureDesc> measureDescList = 
Lists.newArrayListWithExpectedSize(measures.size() * 2 + 1 + 1);
 
-        List<Pair<String, String>> measureTypeList = 
HiveTableCreator.getHiveColumnsForMetricsQuery();
+        List<Pair<String, String>> measureTypeList = 
HiveTableCreator.getHiveColumnsForMetricsQueryExecution();
         Map<String, String> measureTypeMap = 
Maps.newHashMapWithExpectedSize(measureTypeList.size());
         for (Pair<String, String> entry : measureTypeList) {
             measureTypeMap.put(entry.getFirst(), entry.getSecond());
         }
         measureDescList.add(getMeasureCount());
-        
measureDescList.add(getMeasureMin(QueryPropertyEnum.TIME_COST.toString(),
-                measureTypeMap.get(QueryPropertyEnum.TIME_COST.toString())));
+        
measureDescList.add(getMeasureMin(QuerySparkExecutionEnum.TIME_COST.toString(),
+                
measureTypeMap.get(QuerySparkExecutionEnum.TIME_COST.toString())));
         for (String measure : measures) {
             measureDescList.add(getMeasureSum(measure, 
measureTypeMap.get(measure)));
             measureDescList.add(getMeasureMax(measure, 
measureTypeMap.get(measure)));
         }
-        
measureDescList.add(getMeasureHLL(QueryPropertyEnum.ID_CODE.toString()));
-        
measureDescList.add(getMeasurePercentile(QueryPropertyEnum.TIME_COST.toString()));
+        
measureDescList.add(getMeasureHLL(QuerySparkExecutionEnum.ID_CODE.toString()));
+        
measureDescList.add(getMeasurePercentile(QuerySparkExecutionEnum.TIME_COST.toString()));
 
         //Set for row key
         RowKeyColDesc[] rowKeyColDescs = new 
RowKeyColDesc[dimensionDescList.size()];
         int idx = getTimeRowKeyColDesc(tableName, rowKeyColDescs);
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QueryPropertyEnum.USER.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QuerySparkExecutionEnum.USER.toString(), idx + 1);
+        idx++;
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QuerySparkExecutionEnum.PROJECT.toString(), idx + 1);
+        idx++;
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QuerySparkExecutionEnum.REALIZATION.toString(), idx + 1);
+        idx++;
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QuerySparkExecutionEnum.REALIZATION_TYPE.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QueryPropertyEnum.PROJECT.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QuerySparkExecutionEnum.CUBOID_IDS.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QueryPropertyEnum.REALIZATION.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QuerySparkExecutionEnum.EXCEPTION.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QueryPropertyEnum.REALIZATION_TYPE.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QuerySparkExecutionEnum.TYPE.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QueryPropertyEnum.EXCEPTION.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QuerySparkExecutionEnum.SPARDER_NAME.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QueryPropertyEnum.TYPE.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QuerySparkExecutionEnum.QUERY_ID.toString(), idx + 1);
+        idx++;
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QuerySparkExecutionEnum.START_TIME.toString(), idx + 1);
+        idx++;
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QuerySparkExecutionEnum.END_TIME.toString(), idx + 1);
         idx++;
         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
RecordEvent.RecordReserveKeyEnum.HOST.toString(), idx + 1);
         idx++;
@@ -109,11 +118,18 @@ public class CubeDescCreator {
         rowKeyDesc.setRowkeyColumns(rowKeyColDescs);
 
         //Set for aggregation group
-        String[][] hierarchy_dims = new String[2][];
+        String[][] hierarchy_dims = new String[4][];
         hierarchy_dims[0] = getTimeHierarchy();
-        hierarchy_dims[1] = new String[2];
-        hierarchy_dims[1][0] = QueryPropertyEnum.REALIZATION_TYPE.toString();
-        hierarchy_dims[1][1] = QueryPropertyEnum.REALIZATION.toString();
+        hierarchy_dims[1] = new String[3];
+        hierarchy_dims[1][0] = 
QuerySparkExecutionEnum.REALIZATION_TYPE.toString();
+        hierarchy_dims[1][1] = QuerySparkExecutionEnum.REALIZATION.toString();
+        hierarchy_dims[1][2] = QuerySparkExecutionEnum.CUBOID_IDS.toString();
+        hierarchy_dims[2] = new String[2];
+        hierarchy_dims[2][0] = QuerySparkExecutionEnum.START_TIME.toString();
+        hierarchy_dims[2][1] = QuerySparkExecutionEnum.END_TIME.toString();
+        hierarchy_dims[3] = new String[2];
+        hierarchy_dims[3][0] = QuerySparkExecutionEnum.SPARDER_NAME.toString();
+        hierarchy_dims[3][1] = 
RecordEvent.RecordReserveKeyEnum.HOST.toString();
         for (int i = 0; i < hierarchy_dims.length; i++) {
             hierarchy_dims[i] = refineColumnWithTable(tableName, 
hierarchy_dims[i]);
         }
@@ -135,15 +151,15 @@ public class CubeDescCreator {
                 rowKeyDesc, aggGroup, hBaseMapping, 
sinkDesc.getCubeDescOverrideProperties());
     }
 
-    public static CubeDesc 
generateKylinCubeDescForMetricsQueryCube(KylinConfig config, MetricsSinkDesc 
sinkDesc) {
-        String tableName = 
sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQueryCube());
+    public static CubeDesc 
generateKylinCubeDescForMetricsQuerySparkJob(KylinConfig config, 
MetricsSinkDesc sinkDesc) {
+        String tableName = 
sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQuerySparkJob());
 
         //Set for dimensions
-        List<String> dimensions = 
ModelCreator.getDimensionsForMetricsQueryCube();
+        List<String> dimensions = 
ModelCreator.getDimensionsForMetricsQuerySparkJob();
         dimensions.remove(TimePropertyEnum.DAY_TIME.toString());
         dimensions.remove(RecordEvent.RecordReserveKeyEnum.TIME.toString());
         dimensions.remove(RecordEvent.RecordReserveKeyEnum.HOST.toString());
-        dimensions.remove(QueryCubePropertyEnum.PROJECT.toString());
+        dimensions.remove(QuerySparkJobEnum.PROJECT.toString());
 
         List<DimensionDesc> dimensionDescList = 
Lists.newArrayListWithExpectedSize(dimensions.size());
         for (String dimensionName : dimensions) {
@@ -151,10 +167,10 @@ public class CubeDescCreator {
         }
 
         //Set for measures
-        List<String> measures = ModelCreator.getMeasuresForMetricsQueryCube();
+        List<String> measures = 
ModelCreator.getMeasuresForMetricsQuerySparkJob();
         List<MeasureDesc> measureDescList = 
Lists.newArrayListWithExpectedSize(measures.size() * 2);
 
-        List<Pair<String, String>> measureTypeList = 
HiveTableCreator.getHiveColumnsForMetricsQueryCube();
+        List<Pair<String, String>> measureTypeList = 
HiveTableCreator.getHiveColumnsForMetricsQuerySparkJob();
         Map<String, String> measureTypeMap = 
Maps.newHashMapWithExpectedSize(measureTypeList.size());
         for (Pair<String, String> entry : measureTypeList) {
             measureTypeMap.put(entry.getFirst(), entry.getSecond());
@@ -162,53 +178,43 @@ public class CubeDescCreator {
         measureDescList.add(getMeasureCount());
         for (String measure : measures) {
             measureDescList.add(getMeasureSum(measure, 
measureTypeMap.get(measure)));
-            if 
(!measure.equals(QueryCubePropertyEnum.WEIGHT_PER_HIT.toString())) {
-                measureDescList.add(getMeasureMax(measure, 
measureTypeMap.get(measure)));
-            }
+            measureDescList.add(getMeasureMax(measure, 
measureTypeMap.get(measure)));
         }
 
         //Set for row key
         RowKeyColDesc[] rowKeyColDescs = new 
RowKeyColDesc[dimensionDescList.size()];
         int idx = getTimeRowKeyColDesc(tableName, rowKeyColDescs);
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QueryCubePropertyEnum.CUBE.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QuerySparkJobEnum.JOB_ID.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QueryCubePropertyEnum.SEGMENT.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QuerySparkJobEnum.EXECUTION_ID.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QueryCubePropertyEnum.CUBOID_SOURCE.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QuerySparkJobEnum.QUERY_ID.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QueryCubePropertyEnum.CUBOID_TARGET.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QuerySparkJobEnum.START_TIME.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QueryCubePropertyEnum.FILTER_MASK.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QuerySparkJobEnum.END_TIME.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QueryCubePropertyEnum.IF_MATCH.toString(), idx + 1);
-        idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QueryCubePropertyEnum.IF_SUCCESS.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QuerySparkJobEnum.IF_SUCCESS.toString(), idx + 1);
         idx++;
 
         RowKeyDesc rowKeyDesc = new RowKeyDesc();
         rowKeyDesc.setRowkeyColumns(rowKeyColDescs);
 
-        //Set for aggregation group
-        String[] mandatory_dims = new String[] { 
QueryCubePropertyEnum.CUBE.toString() };
-        mandatory_dims = refineColumnWithTable(tableName, mandatory_dims);
-
-        String[][] hierarchy_dims = new String[1][];
+        String[][] hierarchy_dims = new String[2][];
         hierarchy_dims[0] = getTimeHierarchy();
+        hierarchy_dims[1] = new String[3];
+        hierarchy_dims[1][0] = QuerySparkJobEnum.QUERY_ID.toString();
+        hierarchy_dims[1][1] = QuerySparkJobEnum.EXECUTION_ID.toString();
+        hierarchy_dims[1][2] = QuerySparkJobEnum.JOB_ID.toString();
+
         for (int i = 0; i < hierarchy_dims.length; i++) {
             hierarchy_dims[i] = refineColumnWithTable(tableName, 
hierarchy_dims[i]);
         }
 
-        String[][] joint_dims = new String[1][];
-        joint_dims[0] = new String[] { 
QueryCubePropertyEnum.CUBOID_SOURCE.toString(),
-                QueryCubePropertyEnum.CUBOID_TARGET.toString() };
-        for (int i = 0; i < joint_dims.length; i++) {
-            joint_dims[i] = refineColumnWithTable(tableName, joint_dims[i]);
-        }
-
         SelectRule selectRule = new SelectRule();
-        selectRule.mandatoryDims = mandatory_dims;
+        selectRule.mandatoryDims = new String[0];
         selectRule.hierarchyDims = hierarchy_dims;
-        selectRule.jointDims = joint_dims;
+        selectRule.jointDims = new String[0][0];
 
         AggregationGroup aggGroup = new AggregationGroup();
         aggGroup.setIncludes(refineColumnWithTable(tableName, dimensions));
@@ -222,11 +228,11 @@ public class CubeDescCreator {
                 rowKeyDesc, aggGroup, hBaseMapping, 
sinkDesc.getCubeDescOverrideProperties());
     }
 
-    public static CubeDesc generateKylinCubeDescForMetricsQueryRPC(KylinConfig 
config, MetricsSinkDesc sinkDesc) {
-        String tableName = 
sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQueryRpcCall());
+    public static CubeDesc 
generateKylinCubeDescForMetricsQuerySparkStage(KylinConfig config, 
MetricsSinkDesc sinkDesc) {
+        String tableName = 
sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQuerySparkStage());
 
         //Set for dimensions
-        List<String> dimensions = 
ModelCreator.getDimensionsForMetricsQueryRPC();
+        List<String> dimensions = 
ModelCreator.getDimensionsForMetricsQuerySparkStage();
         dimensions.remove(TimePropertyEnum.DAY_TIME.toString());
         dimensions.remove(RecordEvent.RecordReserveKeyEnum.TIME.toString());
 
@@ -236,10 +242,10 @@ public class CubeDescCreator {
         }
 
         //Set for measures
-        List<String> measures = ModelCreator.getMeasuresForMetricsQueryRPC();
+        List<String> measures = 
ModelCreator.getMeasuresForMetricsQuerySparkStage();
         List<MeasureDesc> measureDescList = 
Lists.newArrayListWithExpectedSize(measures.size() * 2 + 1 + 1);
 
-        List<Pair<String, String>> measureTypeList = 
HiveTableCreator.getHiveColumnsForMetricsQueryRPC();
+        List<Pair<String, String>> measureTypeList = 
HiveTableCreator.getHiveColumnsForMetricsQuerySparkStage();
         Map<String, String> measureTypeMap = 
Maps.newHashMapWithExpectedSize(measureTypeList.size());
         for (Pair<String, String> entry : measureTypeList) {
             measureTypeMap.put(entry.getFirst(), entry.getSecond());
@@ -249,28 +255,42 @@ public class CubeDescCreator {
             measureDescList.add(getMeasureSum(measure, 
measureTypeMap.get(measure)));
             measureDescList.add(getMeasureMax(measure, 
measureTypeMap.get(measure)));
         }
-        
measureDescList.add(getMeasurePercentile(QueryRPCPropertyEnum.CALL_TIME.toString()));
 
         //Set for row key
         RowKeyColDesc[] rowKeyColDescs = new 
RowKeyColDesc[dimensionDescList.size()];
         int idx = getTimeRowKeyColDesc(tableName, rowKeyColDescs);
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QueryRPCPropertyEnum.PROJECT.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QuerySparkStageEnum.PROJECT.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QueryRPCPropertyEnum.REALIZATION.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QuerySparkStageEnum.REALIZATION.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QueryRPCPropertyEnum.RPC_SERVER.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QuerySparkStageEnum.CUBOID_ID.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
RecordEvent.RecordReserveKeyEnum.HOST.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QuerySparkStageEnum.QUERY_ID.toString(), idx + 1);
+        idx++;
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QuerySparkStageEnum.EXECUTION_ID.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QueryRPCPropertyEnum.EXCEPTION.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QuerySparkStageEnum.JOB_ID.toString(), idx + 1);
+        idx++;
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QuerySparkStageEnum.STAGE_ID.toString(), idx + 1);
+        idx++;
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QuerySparkStageEnum.IF_SUCCESS.toString(), idx + 1);
+        idx++;
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
QuerySparkStageEnum.SUBMIT_TIME.toString(), idx + 1);
+        idx++;
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, 
RecordEvent.RecordReserveKeyEnum.HOST.toString(), idx + 1);
         idx++;
 
         RowKeyDesc rowKeyDesc = new RowKeyDesc();
         rowKeyDesc.setRowkeyColumns(rowKeyColDescs);
 
         //Set for aggregation group
-        String[][] hierarchy_dims = new String[1][];
+        String[][] hierarchy_dims = new String[2][];
         hierarchy_dims[0] = getTimeHierarchy();
+        hierarchy_dims[1] = new String[4];
+        hierarchy_dims[1][0] = QuerySparkStageEnum.QUERY_ID.toString();
+        hierarchy_dims[1][1] = QuerySparkStageEnum.EXECUTION_ID.toString();
+        hierarchy_dims[1][2] = QuerySparkStageEnum.JOB_ID.toString();
+        hierarchy_dims[1][3] = QuerySparkStageEnum.STAGE_ID.toString();
         for (int i = 0; i < hierarchy_dims.length; i++) {
             hierarchy_dims[i] = refineColumnWithTable(tableName, 
hierarchy_dims[i]);
         }
@@ -447,7 +467,7 @@ public class CubeDescCreator {
         desc.setDimensions(dimensionDescList);
         desc.setMeasures(measureDescList);
         desc.setRowkey(rowKeyDesc);
-        desc.setHbaseMapping(hBaseMapping);
+        //desc.setHbaseMapping(hBaseMapping);
         desc.setNotifyList(Lists.<String> newArrayList());
         
desc.setStatusNeedNotify(Lists.newArrayList(JobStatusEnum.ERROR.toString()));
         desc.setAutoMergeTimeRanges(new long[] { 86400000L, 604800000L, 
2419200000L });
diff --git 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeInstanceCreator.java
 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeInstanceCreator.java
index 7d70bc2..e96da4d 100644
--- 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeInstanceCreator.java
+++ 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeInstanceCreator.java
@@ -34,7 +34,7 @@ public class CubeInstanceCreator {
     public static void main(String[] args) throws Exception {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
 
-        CubeInstance cubeInstance = 
generateKylinCubeInstanceForMetricsQuery("ADMIN", config, new 
MetricsSinkDesc());
+        CubeInstance cubeInstance = 
generateKylinCubeInstanceForMetricsQueryExecution("ADMIN", config, new 
MetricsSinkDesc());
         ByteArrayOutputStream buf = new ByteArrayOutputStream();
         DataOutputStream dout = new DataOutputStream(buf);
         CubeManager.CUBE_SERIALIZER.serialize(cubeInstance, dout);
@@ -43,21 +43,21 @@ public class CubeInstanceCreator {
         System.out.println(buf.toString("UTF-8"));
     }
 
-    public static CubeInstance generateKylinCubeInstanceForMetricsQuery(String 
owner, KylinConfig config,
+    public static CubeInstance 
generateKylinCubeInstanceForMetricsQueryExecution(String owner, KylinConfig 
config,
             MetricsSinkDesc sinkDesc) {
-        return generateKylinCubeInstance(owner, 
sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQuery()));
+        return generateKylinCubeInstance(owner, 
sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQueryExecution()));
     }
 
-    public static CubeInstance 
generateKylinCubeInstanceForMetricsQueryCube(String owner, KylinConfig config,
+    public static CubeInstance 
generateKylinCubeInstanceForMetricsQuerySparkJob(String owner, KylinConfig 
config,
             MetricsSinkDesc sinkDesc) {
         return generateKylinCubeInstance(owner,
-                
sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQueryCube()));
+                
sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQuerySparkJob()));
     }
 
-    public static CubeInstance 
generateKylinCubeInstanceForMetricsQueryRPC(String owner, KylinConfig config,
+    public static CubeInstance 
generateKylinCubeInstanceForMetricsQuerySparkStage(String owner, KylinConfig 
config,
             MetricsSinkDesc sinkDesc) {
         return generateKylinCubeInstance(owner,
-                
sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQueryRpcCall()));
+                
sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQuerySparkStage()));
     }
 
     public static CubeInstance generateKylinCubeInstanceForMetricsJob(String 
owner, KylinConfig config,
diff --git 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
index 35d9efb..73e493e 100644
--- 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
+++ 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
@@ -29,10 +29,9 @@ import org.apache.kylin.metrics.lib.impl.TimePropertyEnum;
 import org.apache.kylin.metrics.lib.impl.hive.HiveProducerRecord;
 import org.apache.kylin.metrics.lib.impl.hive.HiveReservoirReporter;
 import org.apache.kylin.metrics.property.JobPropertyEnum;
-import org.apache.kylin.metrics.property.QueryCubePropertyEnum;
-import org.apache.kylin.metrics.property.QueryPropertyEnum;
-import org.apache.kylin.metrics.property.QueryRPCPropertyEnum;
-
+import org.apache.kylin.metrics.property.QuerySparkExecutionEnum;
+import org.apache.kylin.metrics.property.QuerySparkJobEnum;
+import org.apache.kylin.metrics.property.QuerySparkStageEnum;
 import org.apache.kylin.shaded.com.google.common.base.Strings;
 import org.apache.kylin.shaded.com.google.common.collect.Lists;
 
@@ -97,18 +96,18 @@ public class HiveTableCreator {
     }
 
     public static String generateHiveTableSQLForMetricsQuery(KylinConfig 
config) {
-        String tableName = 
HiveReservoirReporter.getTableFromSubject(config.getKylinMetricsSubjectQuery());
-        return generateHiveTableSQL(tableName, 
getHiveColumnsForMetricsQuery(), getPartitionKVsForHiveTable());
+        String tableName = 
HiveReservoirReporter.getTableFromSubject(config.getKylinMetricsSubjectQueryExecution());
+        return generateHiveTableSQL(tableName, 
getHiveColumnsForMetricsQueryExecution(), getPartitionKVsForHiveTable());
     }
 
     public static String generateHiveTableSQLForMetricsQueryCUBE(KylinConfig 
config) {
-        String tableName = 
HiveReservoirReporter.getTableFromSubject(config.getKylinMetricsSubjectQueryCube());
-        return generateHiveTableSQL(tableName, 
getHiveColumnsForMetricsQueryCube(), getPartitionKVsForHiveTable());
+        String tableName = 
HiveReservoirReporter.getTableFromSubject(config.getKylinMetricsSubjectQuerySparkJob());
+        return generateHiveTableSQL(tableName, 
getHiveColumnsForMetricsQuerySparkJob(), getPartitionKVsForHiveTable());
     }
 
     public static String generateHiveTableSQLForMetricsQueryRPC(KylinConfig 
config) {
-        String tableName = 
HiveReservoirReporter.getTableFromSubject(config.getKylinMetricsSubjectQueryRpcCall());
-        return generateHiveTableSQL(tableName, 
getHiveColumnsForMetricsQueryRPC(), getPartitionKVsForHiveTable());
+        String tableName = 
HiveReservoirReporter.getTableFromSubject(config.getKylinMetricsSubjectQuerySparkStage());
+        return generateHiveTableSQL(tableName, 
getHiveColumnsForMetricsQuerySparkStage(), getPartitionKVsForHiveTable());
     }
 
     public static String generateHiveTableSQLForMetricsJob(KylinConfig config) 
{
@@ -121,67 +120,94 @@ public class HiveTableCreator {
         return generateHiveTableSQL(tableName, 
getHiveColumnsForMetricsJobException(), getPartitionKVsForHiveTable());
     }
 
-    public static List<Pair<String, String>> getHiveColumnsForMetricsQuery() {
+    public static List<Pair<String, String>> 
getHiveColumnsForMetricsQueryExecution() {
         List<Pair<String, String>> columns = Lists.newLinkedList();
-        columns.add(new Pair<>(QueryPropertyEnum.ID_CODE.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.ID_CODE.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.QUERY_ID.toString(), 
HiveTypeEnum.HSTRING.toString()));
+        columns.add(new 
Pair<>(QuerySparkExecutionEnum.EXECUTION_ID.toString(), 
HiveTypeEnum.HBIGINT.toString()));
         columns.add(new 
Pair<>(RecordEvent.RecordReserveKeyEnum.HOST.toString(), 
HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QueryPropertyEnum.USER.toString(), 
HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QueryPropertyEnum.PROJECT.toString(), 
HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QueryPropertyEnum.REALIZATION.toString(), 
HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QueryPropertyEnum.REALIZATION_TYPE.toString(), 
HiveTypeEnum.HINT.toString()));
-        columns.add(new Pair<>(QueryPropertyEnum.TYPE.toString(), 
HiveTypeEnum.HSTRING.toString()));
-
-        columns.add(new Pair<>(QueryPropertyEnum.EXCEPTION.toString(), 
HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QueryPropertyEnum.TIME_COST.toString(), 
HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new 
Pair<>(QueryPropertyEnum.CALCITE_RETURN_COUNT.toString(), 
HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new 
Pair<>(QueryPropertyEnum.STORAGE_RETURN_COUNT.toString(), 
HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryPropertyEnum.AGGR_FILTER_COUNT.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.USER.toString(), 
HiveTypeEnum.HSTRING.toString()));
+        columns.add(new 
Pair<>(QuerySparkExecutionEnum.SPARDER_NAME.toString(), 
HiveTypeEnum.HSTRING.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.PROJECT.toString(), 
HiveTypeEnum.HSTRING.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.REALIZATION.toString(), 
HiveTypeEnum.HSTRING.toString()));
+        columns.add(new 
Pair<>(QuerySparkExecutionEnum.REALIZATION_TYPE.toString(), 
HiveTypeEnum.HINT.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.CUBOID_IDS.toString(), 
HiveTypeEnum.HSTRING.toString()));
+
+        columns.add(new Pair<>(QuerySparkExecutionEnum.TYPE.toString(), 
HiveTypeEnum.HSTRING.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.START_TIME.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.END_TIME.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+
+        columns.add(new Pair<>(QuerySparkExecutionEnum.EXCEPTION.toString(), 
HiveTypeEnum.HSTRING.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.TIME_COST.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new 
Pair<>(QuerySparkExecutionEnum.TOTAL_SCAN_COUNT.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new 
Pair<>(QuerySparkExecutionEnum.TOTAL_SCAN_BYTES.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new 
Pair<>(QuerySparkExecutionEnum.RESULT_COUNT.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+
+        columns.add(new 
Pair<>(QuerySparkExecutionEnum.EXECUTION_DURATION.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.RESULT_SIZE.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new 
Pair<>(QuerySparkExecutionEnum.EXECUTOR_DESERIALIZE_TIME.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new 
Pair<>(QuerySparkExecutionEnum.EXECUTOR_DESERIALIZE_CPU_TIME.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new 
Pair<>(QuerySparkExecutionEnum.EXECUTOR_RUN_TIME.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new 
Pair<>(QuerySparkExecutionEnum.EXECUTOR_CPU_TIME.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.JVM_GC_TIME.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new 
Pair<>(QuerySparkExecutionEnum.RESULT_SERIALIZATION_TIME.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new 
Pair<>(QuerySparkExecutionEnum.MEMORY_BYTE_SPILLED.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new 
Pair<>(QuerySparkExecutionEnum.DISK_BYTES_SPILLED.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new 
Pair<>(QuerySparkExecutionEnum.PEAK_EXECUTION_MEMORY.toString(), 
HiveTypeEnum.HBIGINT.toString()));
 
         columns.addAll(getTimeColumnsForMetrics());
         return columns;
     }
 
-    public static List<Pair<String, String>> 
getHiveColumnsForMetricsQueryCube() {
+    public static List<Pair<String, String>> 
getHiveColumnsForMetricsQuerySparkJob() {
         List<Pair<String, String>> columns = Lists.newLinkedList();
+        columns.add(new Pair<>(QuerySparkJobEnum.QUERY_ID.toString(), 
HiveTypeEnum.HSTRING.toString()));
+        columns.add(new Pair<>(QuerySparkJobEnum.PROJECT.toString(), 
HiveTypeEnum.HSTRING.toString()));
+        columns.add(new Pair<>(QuerySparkJobEnum.EXECUTION_ID.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkJobEnum.JOB_ID.toString(), 
HiveTypeEnum.HINT.toString()));
         columns.add(new 
Pair<>(RecordEvent.RecordReserveKeyEnum.HOST.toString(), 
HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.PROJECT.toString(), 
HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.CUBE.toString(), 
HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.SEGMENT.toString(), 
HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.CUBOID_SOURCE.toString(), 
HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.CUBOID_TARGET.toString(), 
HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.IF_MATCH.toString(), 
HiveTypeEnum.HBOOLEAN.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.FILTER_MASK.toString(), 
HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.IF_SUCCESS.toString(), 
HiveTypeEnum.HBOOLEAN.toString()));
-
-        columns.add(new 
Pair<>(QueryCubePropertyEnum.WEIGHT_PER_HIT.toString(), 
HiveTypeEnum.HDOUBLE.toString()));
-
-        columns.add(new Pair<>(QueryCubePropertyEnum.CALL_COUNT.toString(), 
HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.TIME_SUM.toString(), 
HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.TIME_MAX.toString(), 
HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.SKIP_COUNT.toString(), 
HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.SCAN_COUNT.toString(), 
HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.RETURN_COUNT.toString(), 
HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new 
Pair<>(QueryCubePropertyEnum.AGGR_FILTER_COUNT.toString(), 
HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.AGGR_COUNT.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkJobEnum.START_TIME.toString(), 
HiveTypeEnum.HSTRING.toString()));
+        columns.add(new Pair<>(QuerySparkJobEnum.END_TIME.toString(), 
HiveTypeEnum.HSTRING.toString()));
+        columns.add(new Pair<>(QuerySparkJobEnum.IF_SUCCESS.toString(), 
HiveTypeEnum.HBOOLEAN.toString()));
+
+        columns.add(new Pair<>(QuerySparkJobEnum.RESULT_SIZE.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new 
Pair<>(QuerySparkJobEnum.EXECUTOR_DESERIALIZE_TIME.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new 
Pair<>(QuerySparkJobEnum.EXECUTOR_DESERIALIZE_CPU_TIME.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkJobEnum.EXECUTOR_RUN_TIME.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkJobEnum.EXECUTOR_CPU_TIME.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkJobEnum.JVM_GC_TIME.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new 
Pair<>(QuerySparkJobEnum.RESULT_SERIALIZATION_TIME.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new 
Pair<>(QuerySparkJobEnum.MEMORY_BYTE_SPILLED.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new 
Pair<>(QuerySparkJobEnum.DISK_BYTES_SPILLED.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new 
Pair<>(QuerySparkJobEnum.PEAK_EXECUTION_MEMORY.toString(), 
HiveTypeEnum.HBIGINT.toString()));
 
         columns.addAll(getTimeColumnsForMetrics());
         return columns;
     }
 
-    public static List<Pair<String, String>> 
getHiveColumnsForMetricsQueryRPC() {
+    public static List<Pair<String, String>> 
getHiveColumnsForMetricsQuerySparkStage() {
         List<Pair<String, String>> columns = Lists.newLinkedList();
         columns.add(new 
Pair<>(RecordEvent.RecordReserveKeyEnum.HOST.toString(), 
HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QueryRPCPropertyEnum.PROJECT.toString(), 
HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QueryRPCPropertyEnum.REALIZATION.toString(), 
HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QueryRPCPropertyEnum.RPC_SERVER.toString(), 
HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QueryRPCPropertyEnum.EXCEPTION.toString(), 
HiveTypeEnum.HSTRING.toString()));
-
-        columns.add(new Pair<>(QueryRPCPropertyEnum.CALL_TIME.toString(), 
HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryRPCPropertyEnum.RETURN_COUNT.toString(), 
HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryRPCPropertyEnum.SCAN_COUNT.toString(), 
HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryRPCPropertyEnum.SKIP_COUNT.toString(), 
HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new 
Pair<>(QueryRPCPropertyEnum.AGGR_FILTER_COUNT.toString(), 
HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryRPCPropertyEnum.AGGR_COUNT.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.QUERY_ID.toString(), 
HiveTypeEnum.HSTRING.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.EXECUTION_ID.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.JOB_ID.toString(), 
HiveTypeEnum.HINT.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.STAGE_ID.toString(), 
HiveTypeEnum.HINT.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.SUBMIT_TIME.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.PROJECT.toString(), 
HiveTypeEnum.HSTRING.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.REALIZATION.toString(), 
HiveTypeEnum.HSTRING.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.CUBOID_ID.toString(), 
HiveTypeEnum.HSTRING.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.IF_SUCCESS.toString(), 
HiveTypeEnum.HBOOLEAN.toString()));
+
+        columns.add(new Pair<>(QuerySparkStageEnum.RESULT_SIZE.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new 
Pair<>(QuerySparkStageEnum.EXECUTOR_DESERIALIZE_TIME.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new 
Pair<>(QuerySparkStageEnum.EXECUTOR_DESERIALIZE_CPU_TIME.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new 
Pair<>(QuerySparkStageEnum.EXECUTOR_RUN_TIME.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new 
Pair<>(QuerySparkStageEnum.EXECUTOR_CPU_TIME.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.JVM_GC_TIME.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new 
Pair<>(QuerySparkStageEnum.RESULT_SERIALIZATION_TIME.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new 
Pair<>(QuerySparkStageEnum.MEMORY_BYTE_SPILLED.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new 
Pair<>(QuerySparkStageEnum.DISK_BYTES_SPILLED.toString(), 
HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new 
Pair<>(QuerySparkStageEnum.PEAK_EXECUTION_MEMORY.toString(), 
HiveTypeEnum.HBIGINT.toString()));
 
         columns.addAll(getTimeColumnsForMetrics());
         return columns;
diff --git 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java
 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java
index 9636811..6d70a7e 100644
--- 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java
+++ 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java
@@ -39,7 +39,7 @@ public class KylinTableCreator {
     public static void main(String[] args) throws Exception {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
 
-        TableDesc kylinTable = generateKylinTableForMetricsQuery(config, new 
MetricsSinkDesc());
+        TableDesc kylinTable = 
generateKylinTableForMetricsQueryExecution(config, new MetricsSinkDesc());
         ByteArrayOutputStream buf = new ByteArrayOutputStream();
         DataOutputStream dout = new DataOutputStream(buf);
         TableMetadataManager.TABLE_SERIALIZER.serialize(kylinTable, dout);
@@ -48,25 +48,25 @@ public class KylinTableCreator {
         System.out.println(buf.toString("UTF-8"));
     }
 
-    public static TableDesc generateKylinTableForMetricsQuery(KylinConfig 
kylinConfig, MetricsSinkDesc sinkDesc) {
+    public static TableDesc 
generateKylinTableForMetricsQueryExecution(KylinConfig kylinConfig, 
MetricsSinkDesc sinkDesc) {
         List<Pair<String, String>> columns = Lists.newLinkedList();
-        columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQuery());
+        
columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQueryExecution());
         columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable());
-        return generateKylinTable(kylinConfig, sinkDesc, 
kylinConfig.getKylinMetricsSubjectQuery(), columns);
+        return generateKylinTable(kylinConfig, sinkDesc, 
kylinConfig.getKylinMetricsSubjectQueryExecution(), columns);
     }
 
-    public static TableDesc generateKylinTableForMetricsQueryCube(KylinConfig 
kylinConfig, MetricsSinkDesc sinkDesc) {
+    public static TableDesc 
generateKylinTableForMetricsQuerySparkJob(KylinConfig kylinConfig, 
MetricsSinkDesc sinkDesc) {
         List<Pair<String, String>> columns = Lists.newLinkedList();
-        columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQueryCube());
+        
columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQuerySparkJob());
         columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable());
-        return generateKylinTable(kylinConfig, sinkDesc, 
kylinConfig.getKylinMetricsSubjectQueryCube(), columns);
+        return generateKylinTable(kylinConfig, sinkDesc, 
kylinConfig.getKylinMetricsSubjectQuerySparkJob(), columns);
     }
 
-    public static TableDesc generateKylinTableForMetricsQueryRPC(KylinConfig 
kylinConfig, MetricsSinkDesc sinkDesc) {
+    public static TableDesc 
generateKylinTableForMetricsQuerySparkStage(KylinConfig kylinConfig, 
MetricsSinkDesc sinkDesc) {
         List<Pair<String, String>> columns = Lists.newLinkedList();
-        columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQueryRPC());
+        
columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQuerySparkStage());
         columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable());
-        return generateKylinTable(kylinConfig, sinkDesc, 
kylinConfig.getKylinMetricsSubjectQueryRpcCall(), columns);
+        return generateKylinTable(kylinConfig, sinkDesc, 
kylinConfig.getKylinMetricsSubjectQuerySparkStage(), columns);
     }
 
     public static TableDesc generateKylinTableForMetricsJob(KylinConfig 
kylinConfig, MetricsSinkDesc sinkDesc) {
diff --git 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/ModelCreator.java 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/ModelCreator.java
index 429509a..8a36549 100644
--- 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/ModelCreator.java
+++ 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/ModelCreator.java
@@ -32,9 +32,9 @@ import org.apache.kylin.metadata.model.PartitionDesc;
 import org.apache.kylin.metrics.lib.impl.RecordEvent;
 import org.apache.kylin.metrics.lib.impl.TimePropertyEnum;
 import org.apache.kylin.metrics.property.JobPropertyEnum;
-import org.apache.kylin.metrics.property.QueryCubePropertyEnum;
-import org.apache.kylin.metrics.property.QueryPropertyEnum;
-import org.apache.kylin.metrics.property.QueryRPCPropertyEnum;
+import org.apache.kylin.metrics.property.QuerySparkExecutionEnum;
+import org.apache.kylin.metrics.property.QuerySparkJobEnum;
+import org.apache.kylin.metrics.property.QuerySparkStageEnum;
 import org.apache.kylin.tool.metrics.systemcube.def.MetricsSinkDesc;
 
 import org.apache.kylin.shaded.com.google.common.collect.Lists;
@@ -65,23 +65,23 @@ public class ModelCreator {
 
     public static DataModelDesc generateKylinModelForMetricsQuery(String 
owner, KylinConfig kylinConfig,
             MetricsSinkDesc sinkDesc) {
-        String tableName = 
sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQuery());
-        return generateKylinModel(owner, tableName, 
getDimensionsForMetricsQuery(), getMeasuresForMetricsQuery(),
-                getPartitionDesc(tableName));
+        String tableName = 
sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQueryExecution());
+        return generateKylinModel(owner, tableName, 
getDimensionsForMetricsQueryExecution(),
+                getMeasuresForMetricsQueryExecution(), 
getPartitionDesc(tableName));
     }
 
     public static DataModelDesc generateKylinModelForMetricsQueryCube(String 
owner, KylinConfig kylinConfig,
             MetricsSinkDesc sinkDesc) {
-        String tableName = 
sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQueryCube());
-        return generateKylinModel(owner, tableName, 
getDimensionsForMetricsQueryCube(),
-                getMeasuresForMetricsQueryCube(), getPartitionDesc(tableName));
+        String tableName = 
sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQuerySparkJob());
+        return generateKylinModel(owner, tableName, 
getDimensionsForMetricsQuerySparkJob(),
+                getMeasuresForMetricsQuerySparkJob(), 
getPartitionDesc(tableName));
     }
 
     public static DataModelDesc generateKylinModelForMetricsQueryRPC(String 
owner, KylinConfig kylinConfig,
             MetricsSinkDesc sinkDesc) {
-        String tableName = 
sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQueryRpcCall());
-        return generateKylinModel(owner, tableName, 
getDimensionsForMetricsQueryRPC(), getMeasuresForMetricsQueryRPC(),
-                getPartitionDesc(tableName));
+        String tableName = 
sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQuerySparkStage());
+        return generateKylinModel(owner, tableName, 
getDimensionsForMetricsQuerySparkStage(),
+                getMeasuresForMetricsQuerySparkStage(), 
getPartitionDesc(tableName));
     }
 
     public static DataModelDesc generateKylinModelForMetricsJob(String owner, 
KylinConfig kylinConfig,
@@ -98,82 +98,106 @@ public class ModelCreator {
                 getMeasuresForMetricsJobException(), 
getPartitionDesc(tableName));
     }
 
-    public static List<String> getDimensionsForMetricsQuery() {
+    public static List<String> getDimensionsForMetricsQueryExecution() {
         List<String> result = Lists.newLinkedList();
         result.add(RecordEvent.RecordReserveKeyEnum.HOST.toString());
-        result.add(QueryPropertyEnum.USER.toString());
-        result.add(QueryPropertyEnum.PROJECT.toString());
-        result.add(QueryPropertyEnum.REALIZATION.toString());
-        result.add(QueryPropertyEnum.REALIZATION_TYPE.toString());
-        result.add(QueryPropertyEnum.TYPE.toString());
-        result.add(QueryPropertyEnum.EXCEPTION.toString());
+        result.add(QuerySparkExecutionEnum.USER.toString());
+        result.add(QuerySparkExecutionEnum.PROJECT.toString());
+        result.add(QuerySparkExecutionEnum.REALIZATION.toString());
+        result.add(QuerySparkExecutionEnum.REALIZATION_TYPE.toString());
+        result.add(QuerySparkExecutionEnum.CUBOID_IDS.toString());
+        result.add(QuerySparkExecutionEnum.TYPE.toString());
+        result.add(QuerySparkExecutionEnum.EXCEPTION.toString());
+        result.add(QuerySparkExecutionEnum.SPARDER_NAME.toString());
+        result.add(QuerySparkExecutionEnum.QUERY_ID.toString());
+        result.add(QuerySparkExecutionEnum.START_TIME.toString());
+        result.add(QuerySparkExecutionEnum.END_TIME.toString());
 
         result.addAll(getTimeDimensionsForMetrics());
         return result;
     }
 
-    public static List<String> getMeasuresForMetricsQuery() {
+    public static List<String> getMeasuresForMetricsQueryExecution() {
         List<String> result = Lists.newLinkedList();
-        result.add(QueryPropertyEnum.ID_CODE.toString());
-        result.add(QueryPropertyEnum.TIME_COST.toString());
-        result.add(QueryPropertyEnum.CALCITE_RETURN_COUNT.toString());
-        result.add(QueryPropertyEnum.STORAGE_RETURN_COUNT.toString());
-        result.add(QueryPropertyEnum.AGGR_FILTER_COUNT.toString());
-
+        result.add(QuerySparkExecutionEnum.ID_CODE.toString());
+        result.add(QuerySparkExecutionEnum.TIME_COST.toString());
+        result.add(QuerySparkExecutionEnum.TOTAL_SCAN_COUNT.toString());
+        result.add(QuerySparkExecutionEnum.TOTAL_SCAN_BYTES.toString());
+        result.add(QuerySparkExecutionEnum.RESULT_COUNT.toString());
+        result.add(QuerySparkExecutionEnum.EXECUTION_DURATION.toString());
+        result.add(QuerySparkExecutionEnum.RESULT_SIZE.toString());
+        
result.add(QuerySparkExecutionEnum.EXECUTOR_DESERIALIZE_TIME.toString());
+        
result.add(QuerySparkExecutionEnum.EXECUTOR_DESERIALIZE_CPU_TIME.toString());
+        result.add(QuerySparkExecutionEnum.EXECUTOR_RUN_TIME.toString());
+        result.add(QuerySparkExecutionEnum.EXECUTOR_CPU_TIME.toString());
+        result.add(QuerySparkExecutionEnum.JVM_GC_TIME.toString());
+        
result.add(QuerySparkExecutionEnum.RESULT_SERIALIZATION_TIME.toString());
+        result.add(QuerySparkExecutionEnum.MEMORY_BYTE_SPILLED.toString());
+        result.add(QuerySparkExecutionEnum.DISK_BYTES_SPILLED.toString());
+        result.add(QuerySparkExecutionEnum.PEAK_EXECUTION_MEMORY.toString());
         return result;
     }
 
-    public static List<String> getDimensionsForMetricsQueryCube() {
+    public static List<String> getDimensionsForMetricsQuerySparkJob() {
         List<String> result = Lists.newLinkedList();
         result.add(RecordEvent.RecordReserveKeyEnum.HOST.toString());
-        result.add(QueryCubePropertyEnum.PROJECT.toString());
-        result.add(QueryCubePropertyEnum.CUBE.toString());
-        result.add(QueryCubePropertyEnum.SEGMENT.toString());
-        result.add(QueryCubePropertyEnum.CUBOID_SOURCE.toString());
-        result.add(QueryCubePropertyEnum.CUBOID_TARGET.toString());
-        result.add(QueryCubePropertyEnum.FILTER_MASK.toString());
-        result.add(QueryCubePropertyEnum.IF_MATCH.toString());
-        result.add(QueryCubePropertyEnum.IF_SUCCESS.toString());
+        result.add(QuerySparkJobEnum.QUERY_ID.toString());
+        result.add(QuerySparkJobEnum.EXECUTION_ID.toString());
+        result.add(QuerySparkJobEnum.JOB_ID.toString());
+        result.add(QuerySparkJobEnum.PROJECT.toString());
+        result.add(QuerySparkJobEnum.START_TIME.toString());
+        result.add(QuerySparkJobEnum.END_TIME.toString());
+        result.add(QuerySparkJobEnum.IF_SUCCESS.toString());
 
         result.addAll(getTimeDimensionsForMetrics());
         return result;
     }
 
-    public static List<String> getMeasuresForMetricsQueryCube() {
+    public static List<String> getMeasuresForMetricsQuerySparkJob() {
         List<String> result = Lists.newLinkedList();
-        result.add(QueryCubePropertyEnum.WEIGHT_PER_HIT.toString());
-        result.add(QueryCubePropertyEnum.CALL_COUNT.toString());
-        result.add(QueryCubePropertyEnum.TIME_SUM.toString());
-        result.add(QueryCubePropertyEnum.TIME_MAX.toString());
-        result.add(QueryCubePropertyEnum.SKIP_COUNT.toString());
-        result.add(QueryCubePropertyEnum.SCAN_COUNT.toString());
-        result.add(QueryCubePropertyEnum.RETURN_COUNT.toString());
-        result.add(QueryCubePropertyEnum.AGGR_FILTER_COUNT.toString());
-        result.add(QueryCubePropertyEnum.AGGR_COUNT.toString());
+        result.add(QuerySparkJobEnum.RESULT_SIZE.toString());
+        result.add(QuerySparkJobEnum.EXECUTOR_DESERIALIZE_TIME.toString());
+        result.add(QuerySparkJobEnum.EXECUTOR_DESERIALIZE_CPU_TIME.toString());
+        result.add(QuerySparkJobEnum.EXECUTOR_RUN_TIME.toString());
+        result.add(QuerySparkJobEnum.EXECUTOR_CPU_TIME.toString());
+        result.add(QuerySparkJobEnum.JVM_GC_TIME.toString());
+        result.add(QuerySparkJobEnum.RESULT_SERIALIZATION_TIME.toString());
+        result.add(QuerySparkJobEnum.MEMORY_BYTE_SPILLED.toString());
+        result.add(QuerySparkJobEnum.DISK_BYTES_SPILLED.toString());
+        result.add(QuerySparkJobEnum.PEAK_EXECUTION_MEMORY.toString());
 
         return result;
     }
 
-    public static List<String> getDimensionsForMetricsQueryRPC() {
+    public static List<String> getDimensionsForMetricsQuerySparkStage() {
         List<String> result = Lists.newLinkedList();
         result.add(RecordEvent.RecordReserveKeyEnum.HOST.toString());
-        result.add(QueryRPCPropertyEnum.PROJECT.toString());
-        result.add(QueryRPCPropertyEnum.REALIZATION.toString());
-        result.add(QueryRPCPropertyEnum.RPC_SERVER.toString());
-        result.add(QueryRPCPropertyEnum.EXCEPTION.toString());
+        result.add(QuerySparkStageEnum.QUERY_ID.toString());
+        result.add(QuerySparkStageEnum.EXECUTION_ID.toString());
+        result.add(QuerySparkStageEnum.JOB_ID.toString());
+        result.add(QuerySparkStageEnum.STAGE_ID.toString());
+        result.add(QuerySparkStageEnum.SUBMIT_TIME.toString());
+        result.add(QuerySparkStageEnum.PROJECT.toString());
+        result.add(QuerySparkStageEnum.REALIZATION.toString());
+        result.add(QuerySparkStageEnum.CUBOID_ID.toString());
+        result.add(QuerySparkStageEnum.IF_SUCCESS.toString());
 
         result.addAll(getTimeDimensionsForMetrics());
         return result;
     }
 
-    public static List<String> getMeasuresForMetricsQueryRPC() {
+    public static List<String> getMeasuresForMetricsQuerySparkStage() {
         List<String> result = Lists.newLinkedList();
-        result.add(QueryRPCPropertyEnum.CALL_TIME.toString());
-        result.add(QueryRPCPropertyEnum.RETURN_COUNT.toString());
-        result.add(QueryRPCPropertyEnum.SCAN_COUNT.toString());
-        result.add(QueryRPCPropertyEnum.SKIP_COUNT.toString());
-        result.add(QueryRPCPropertyEnum.AGGR_FILTER_COUNT.toString());
-        result.add(QueryRPCPropertyEnum.AGGR_COUNT.toString());
+        result.add(QuerySparkStageEnum.RESULT_SIZE.toString());
+        result.add(QuerySparkStageEnum.EXECUTOR_DESERIALIZE_TIME.toString());
+        
result.add(QuerySparkStageEnum.EXECUTOR_DESERIALIZE_CPU_TIME.toString());
+        result.add(QuerySparkStageEnum.EXECUTOR_RUN_TIME.toString());
+        result.add(QuerySparkStageEnum.EXECUTOR_CPU_TIME.toString());
+        result.add(QuerySparkStageEnum.JVM_GC_TIME.toString());
+        result.add(QuerySparkStageEnum.RESULT_SERIALIZATION_TIME.toString());
+        result.add(QuerySparkStageEnum.MEMORY_BYTE_SPILLED.toString());
+        result.add(QuerySparkStageEnum.DISK_BYTES_SPILLED.toString());
+        result.add(QuerySparkStageEnum.PEAK_EXECUTION_MEMORY.toString());
 
         return result;
     }
diff --git 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/SCCreator.java 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/SCCreator.java
index 17fde95..3322677 100644
--- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/SCCreator.java
+++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/SCCreator.java
@@ -186,9 +186,9 @@ public class SCCreator extends AbstractApplication {
 
     private List<TableDesc> generateKylinTableForSystemCube(MetricsSinkDesc 
sinkDesc) {
         List<TableDesc> result = Lists.newLinkedList();
-        result.add(KylinTableCreator.generateKylinTableForMetricsQuery(config, 
sinkDesc));
-        
result.add(KylinTableCreator.generateKylinTableForMetricsQueryCube(config, 
sinkDesc));
-        
result.add(KylinTableCreator.generateKylinTableForMetricsQueryRPC(config, 
sinkDesc));
+        
result.add(KylinTableCreator.generateKylinTableForMetricsQueryExecution(config, 
sinkDesc));
+        
result.add(KylinTableCreator.generateKylinTableForMetricsQuerySparkJob(config, 
sinkDesc));
+        
result.add(KylinTableCreator.generateKylinTableForMetricsQuerySparkStage(config,
 sinkDesc));
         result.add(KylinTableCreator.generateKylinTableForMetricsJob(config, 
sinkDesc));
         
result.add(KylinTableCreator.generateKylinTableForMetricsJobException(config, 
sinkDesc));
 
@@ -208,9 +208,9 @@ public class SCCreator extends AbstractApplication {
 
     private List<CubeDesc> generateKylinCubeDescForSystemCube(MetricsSinkDesc 
sinkDesc) {
         List<CubeDesc> result = Lists.newLinkedList();
-        
result.add(CubeDescCreator.generateKylinCubeDescForMetricsQuery(config, 
sinkDesc));
-        
result.add(CubeDescCreator.generateKylinCubeDescForMetricsQueryCube(config, 
sinkDesc));
-        
result.add(CubeDescCreator.generateKylinCubeDescForMetricsQueryRPC(config, 
sinkDesc));
+        
result.add(CubeDescCreator.generateKylinCubeDescForMetricsQueryExecution(config,
 sinkDesc));
+        
result.add(CubeDescCreator.generateKylinCubeDescForMetricsQuerySparkJob(config, 
sinkDesc));
+        
result.add(CubeDescCreator.generateKylinCubeDescForMetricsQuerySparkStage(config,
 sinkDesc));
         result.add(CubeDescCreator.generateKylinCubeDescForMetricsJob(config, 
sinkDesc));
         
result.add(CubeDescCreator.generateKylinCubeDescForMetricsJobException(config, 
sinkDesc));
 
@@ -219,9 +219,9 @@ public class SCCreator extends AbstractApplication {
 
     private List<CubeInstance> generateKylinCubeInstanceForSystemCube(String 
owner, MetricsSinkDesc sinkDesc) {
         List<CubeInstance> result = Lists.newLinkedList();
-        
result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQuery(owner, 
config, sinkDesc));
-        
result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQueryCube(owner,
 config, sinkDesc));
-        
result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQueryRPC(owner,
 config, sinkDesc));
+        
result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQueryExecution(owner,
 config, sinkDesc));
+        
result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQuerySparkJob(owner,
 config, sinkDesc));
+        
result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQuerySparkStage(owner,
 config, sinkDesc));
         
result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsJob(owner, 
config, sinkDesc));
         
result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsJobException(owner,
 config, sinkDesc));
 
diff --git 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/streamingv2/KafkaTopicCreator.java
 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/streamingv2/KafkaTopicCreator.java
index c495919..50ab091 100644
--- 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/streamingv2/KafkaTopicCreator.java
+++ 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/streamingv2/KafkaTopicCreator.java
@@ -25,9 +25,9 @@ public class KafkaTopicCreator {
     public static String generateCreateCommand(KylinConfig config) {
         StringBuilder sb = new StringBuilder();
         String[] topics = new String[]{
-                config.getKylinMetricsSubjectQuery(),
-                config.getKylinMetricsSubjectQueryCube(),
-                config.getKylinMetricsSubjectQueryRpcCall(),
+                config.getKylinMetricsSubjectQueryExecution(),
+                config.getKylinMetricsSubjectQuerySparkJob(),
+                config.getKylinMetricsSubjectQuerySparkStage(),
                 config.getKylinMetricsSubjectJob(),
                 config.getKylinMetricsSubjectJobException()};
         for (String topic : topics) {

Reply via email to