This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this 
push:
     new a2e9bfa  KYLIN-4516 support System cube
a2e9bfa is described below

commit a2e9bfa52fe13c7a0b22cd8fd24f1b00ab4a934c
Author: rupengwang <wangrup...@live.cn>
AuthorDate: Sun May 31 10:25:26 2020 +0800

    KYLIN-4516 support System cube
---
 .../java/org/apache/kylin/metadata/MetadataConstants.java   |  3 +++
 .../org/apache/kylin/engine/spark/job/NSparkCubingJob.java  |  8 ++++----
 .../org/apache/kylin/engine/spark/job/NSparkCubingStep.java | 13 +++++++++++++
 .../org/apache/kylin/engine/spark/job/NSparkExecutable.java | 11 ++++++++++-
 .../org/apache/kylin/engine/spark/job/NSparkMergingJob.java |  5 ++---
 .../org/apache/kylin/query/runtime/plans/ResultPlan.scala   |  4 ++--
 .../kylin/tool/metrics/systemcube/CubeDescCreator.java      |  2 +-
 7 files changed, 35 insertions(+), 11 deletions(-)

diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java
index fd8e32a..57801b6 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java
@@ -56,4 +56,7 @@ public interface MetadataConstants {
     public static final String TABLE_EXD_CARDINALITY = "cardinality";
     public static final String TABLE_EXD_DELIM = "delim";
     public static final String TABLE_EXD_DEFAULT_VALUE = "unknown";
+
+    public static final String SOURCE_RECORD_COUNT = "sourceRecordCount";
+
 }
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
index d6dab06..f4e244f 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
@@ -33,7 +33,6 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
-import org.apache.kylin.job.execution.JobTypeEnum;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,10 +53,10 @@ public class NSparkCubingJob extends CubingJob {
 
     // for test use only
     public static NSparkCubingJob create(Set<CubeSegment> segments, String 
submitter) {
-        return create(segments, submitter, JobTypeEnum.INDEX_BUILD, 
UUID.randomUUID().toString());
+        return create(segments, submitter, CubingJobTypeEnum.BUILD, 
UUID.randomUUID().toString());
     }
 
-    public static NSparkCubingJob create(Set<CubeSegment> segments, String 
submitter, JobTypeEnum jobType,
+    public static NSparkCubingJob create(Set<CubeSegment> segments, String 
submitter, CubingJobTypeEnum jobType,
             String jobId) {
         Preconditions.checkArgument(!segments.isEmpty());
         Preconditions.checkArgument(submitter != null);
@@ -80,7 +79,6 @@ public class NSparkCubingJob extends CubingJob {
         job.setId(jobId);
         job.setName(builder.toString());
         job.setProjectName(job.cube.getProject());
-        job.setJobType(jobType);
         job.setTargetSubject(job.cube.getModel().getId());
         job.setTargetSegments(segments.stream().map(x -> 
String.valueOf(x.getUuid())).collect(Collectors.toList()));
         job.setProject(job.cube.getProject());
@@ -98,6 +96,8 @@ public class NSparkCubingJob extends CubingJob {
         job.setParam(MetadataConstants.P_OUTPUT_META_URL, 
job.cube.getConfig().getMetadataUrl().toString());
         job.setParam(MetadataConstants.P_CUBOID_NUMBER, 
String.valueOf(job.cube.getDescriptor().getAllCuboids().size()));
 
+        //set param for job metrics
+        job.setParam(MetadataConstants.P_JOB_TYPE, jobType.toString());
         JobStepFactory.addStep(job, JobStepType.RESOURCE_DETECT, job.cube);
         JobStepFactory.addStep(job, JobStepType.CUBING, job.cube);
 
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
index 4c2fcb9..bdc68c6 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
@@ -19,9 +19,11 @@
 package org.apache.kylin.engine.spark.job;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
@@ -37,6 +39,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -106,4 +109,14 @@ public class NSparkCubingStep extends NSparkExecutable {
         }
         cubeManager.updateCube(update);
     }
+
+    @Override
+    protected Map<String, String> getJobMetricsInfo(KylinConfig config) {
+        CubeManager cubeManager = CubeManager.getInstance(config);
+        CubeInstance cube = cubeManager.getCube(getCubeName());
+        Map<String, String> joblogInfo = Maps.newHashMap();
+        joblogInfo.put(CubingJob.SOURCE_SIZE_BYTES, 
String.valueOf(cube.getInputRecordSizeBytes()));
+        joblogInfo.put(CubingJob.CUBE_SIZE_BYTES, 
String.valueOf(cube.getSizeKB()));
+        return joblogInfo;
+    }
 }
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 7923605..bfab276 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -255,11 +255,13 @@ public class NSparkExecutable extends AbstractExecutable {
 
             CliCommandExecutor exec = new CliCommandExecutor();
             Pair<Integer, String> result = exec.execute(cmd, patternedLogger, 
jobId);
+            updateMetaAfterBuilding(config);
+            //Add metrics information to execute result for JobMetricsFacade
 
+            getManager().addJobInfo(getId(), getJobMetricsInfo(config));
             Map<String, String> extraInfo = 
makeExtraInfo(patternedLogger.getInfo());
             ExecuteResult ret = 
ExecuteResult.createSucceed(result.getSecond());
             ret.getExtraInfo().putAll(extraInfo);
-            updateMetaAfterBuilding(config);
             return ret;
         } catch (Exception e) {
             return ExecuteResult.createError(e);
@@ -269,6 +271,10 @@ public class NSparkExecutable extends AbstractExecutable {
     protected void updateMetaAfterBuilding(KylinConfig config) throws 
IOException {
     }
 
+    protected Map<String, String> getJobMetricsInfo(KylinConfig config) {
+        return Maps.newHashMap();
+    }
+
     protected Map<String, String> getSparkConfigOverride(KylinConfig config) {
         Map<String, String> sparkConfigOverride = 
config.getSparkConfigOverride();
         if (!sparkConfigOverride.containsKey("spark.driver.memory")) {
@@ -335,6 +341,9 @@ public class NSparkExecutable extends AbstractExecutable {
             Class<? extends Object> appClz = 
ClassUtil.forName(getSparkSubmitClassName(), Object.class);
             appClz.getMethod("main", String[].class).invoke(null, (Object) new 
String[] { appArgs });
             updateMetaAfterBuilding(config);
+
+            //Add metrics information to execute result for JobMetricsFacade
+            getManager().addJobInfo(getId(), getJobMetricsInfo(config));
             return ExecuteResult.createSucceed();
         } catch (Exception e) {
             return ExecuteResult.createError(e);
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java
index 68d4ee2..25de377 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java
@@ -44,7 +44,7 @@ public class NSparkMergingJob extends CubingJob {
     private static final Logger logger = 
LoggerFactory.getLogger(NSparkMergingJob.class);
 
     public static NSparkMergingJob merge(CubeSegment mergedSegment, String 
submitter) {
-        return NSparkMergingJob.merge(mergedSegment, submitter, 
JobTypeEnum.INDEX_MERGE, UUID.randomUUID().toString());
+        return NSparkMergingJob.merge(mergedSegment, submitter, 
CubingJobTypeEnum.MERGE, UUID.randomUUID().toString());
     }
 
     /**
@@ -52,7 +52,7 @@ public class NSparkMergingJob extends CubingJob {
      *
      * @param mergedSegment, new segment that expect to merge, which should 
contains a couple of ready segments.
      */
-    public static NSparkMergingJob merge(CubeSegment mergedSegment, String 
submitter, JobTypeEnum jobType, String jobId) {
+    public static NSparkMergingJob merge(CubeSegment mergedSegment, String 
submitter, CubingJobTypeEnum jobType, String jobId) {
         CubeInstance cube = mergedSegment.getCubeInstance();
 
         NSparkMergingJob job = new NSparkMergingJob();
@@ -70,7 +70,6 @@ public class NSparkMergingJob extends CubingJob {
         job.setTargetSubject(mergedSegment.getModel().getUuid());
         
job.setTargetSegments(Lists.newArrayList(String.valueOf(mergedSegment.getUuid())));
         job.setProject(mergedSegment.getProject());
-        job.setJobType(jobType);
         job.setSubmitter(submitter);
 
         job.setParam(MetadataConstants.P_JOB_ID, jobId);
diff --git 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
index f0077af..ef9cd54 100644
--- 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
+++ 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
@@ -93,8 +93,8 @@ object ResultPlan extends Logging {
     try {
       val rows = df.collect()
       val (scanRows, scanBytes) = 
QueryMetricUtils.collectScanMetrics(df.queryExecution.executedPlan)
-      //      QueryContextFacade.current().setScanRows(scanRows)
-      //      QueryContextFacade.current().setScanBytes(scanBytes)
+      
QueryContextFacade.current().addAndGetScannedRows(scanRows.iterator().next())
+      
QueryContextFacade.current().addAndGetScannedBytes(scanBytes.iterator().next())
       val dt = rows.map { row =>
         var rowIndex = 0
         row.toSeq.map { cell => {
diff --git 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
index 5f64da0..264596b 100644
--- 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
+++ 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
@@ -451,7 +451,7 @@ public class CubeDescCreator {
         desc.setNotifyList(Lists.<String> newArrayList());
         
desc.setStatusNeedNotify(Lists.newArrayList(JobStatusEnum.ERROR.toString()));
         desc.setAutoMergeTimeRanges(new long[] { 86400000L, 604800000L, 
2419200000L });
-        desc.setEngineType(IEngineAware.ID_MR_V2);
+        desc.setEngineType(IEngineAware.ID_SPARK_II);
         desc.setStorageType(storageType);
         desc.setAggregationGroups(Lists.newArrayList(aggGroup));
         desc.getOverrideKylinProps().putAll(overrideProperties);

Reply via email to