This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this
push:
new a2e9bfa KYLIN-4516 support System cube
a2e9bfa is described below
commit a2e9bfa52fe13c7a0b22cd8fd24f1b00ab4a934c
Author: rupengwang <[email protected]>
AuthorDate: Sun May 31 10:25:26 2020 +0800
KYLIN-4516 support System cube
---
.../java/org/apache/kylin/metadata/MetadataConstants.java | 3 +++
.../org/apache/kylin/engine/spark/job/NSparkCubingJob.java | 8 ++++----
.../org/apache/kylin/engine/spark/job/NSparkCubingStep.java | 13 +++++++++++++
.../org/apache/kylin/engine/spark/job/NSparkExecutable.java | 11 ++++++++++-
.../org/apache/kylin/engine/spark/job/NSparkMergingJob.java | 5 ++---
.../org/apache/kylin/query/runtime/plans/ResultPlan.scala | 4 ++--
.../kylin/tool/metrics/systemcube/CubeDescCreator.java | 2 +-
7 files changed, 35 insertions(+), 11 deletions(-)
diff --git
a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java
b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java
index fd8e32a..57801b6 100644
---
a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java
+++
b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java
@@ -56,4 +56,7 @@ public interface MetadataConstants {
public static final String TABLE_EXD_CARDINALITY = "cardinality";
public static final String TABLE_EXD_DELIM = "delim";
public static final String TABLE_EXD_DEFAULT_VALUE = "unknown";
+
+ public static final String SOURCE_RECORD_COUNT = "sourceRecordCount";
+
}
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
index d6dab06..f4e244f 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
@@ -33,7 +33,6 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
-import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.metadata.MetadataConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,10 +53,10 @@ public class NSparkCubingJob extends CubingJob {
// for test use only
public static NSparkCubingJob create(Set<CubeSegment> segments, String
submitter) {
- return create(segments, submitter, JobTypeEnum.INDEX_BUILD,
UUID.randomUUID().toString());
+ return create(segments, submitter, CubingJobTypeEnum.BUILD,
UUID.randomUUID().toString());
}
- public static NSparkCubingJob create(Set<CubeSegment> segments, String
submitter, JobTypeEnum jobType,
+ public static NSparkCubingJob create(Set<CubeSegment> segments, String
submitter, CubingJobTypeEnum jobType,
String jobId) {
Preconditions.checkArgument(!segments.isEmpty());
Preconditions.checkArgument(submitter != null);
@@ -80,7 +79,6 @@ public class NSparkCubingJob extends CubingJob {
job.setId(jobId);
job.setName(builder.toString());
job.setProjectName(job.cube.getProject());
- job.setJobType(jobType);
job.setTargetSubject(job.cube.getModel().getId());
job.setTargetSegments(segments.stream().map(x ->
String.valueOf(x.getUuid())).collect(Collectors.toList()));
job.setProject(job.cube.getProject());
@@ -98,6 +96,8 @@ public class NSparkCubingJob extends CubingJob {
job.setParam(MetadataConstants.P_OUTPUT_META_URL,
job.cube.getConfig().getMetadataUrl().toString());
job.setParam(MetadataConstants.P_CUBOID_NUMBER,
String.valueOf(job.cube.getDescriptor().getAllCuboids().size()));
+ //set param for job metrics
+ job.setParam(MetadataConstants.P_JOB_TYPE, jobType.toString());
JobStepFactory.addStep(job, JobStepType.RESOURCE_DETECT, job.cube);
JobStepFactory.addStep(job, JobStepType.CUBING, job.cube);
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
index 4c2fcb9..bdc68c6 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
@@ -19,9 +19,11 @@
package org.apache.kylin.engine.spark.job;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
@@ -37,6 +39,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
import java.util.Set;
/**
@@ -106,4 +109,14 @@ public class NSparkCubingStep extends NSparkExecutable {
}
cubeManager.updateCube(update);
}
+
+ @Override
+ protected Map<String, String> getJobMetricsInfo(KylinConfig config) {
+ CubeManager cubeManager = CubeManager.getInstance(config);
+ CubeInstance cube = cubeManager.getCube(getCubeName());
+ Map<String, String> joblogInfo = Maps.newHashMap();
+ joblogInfo.put(CubingJob.SOURCE_SIZE_BYTES,
String.valueOf(cube.getInputRecordSizeBytes()));
+ joblogInfo.put(CubingJob.CUBE_SIZE_BYTES,
String.valueOf(cube.getSizeKB()));
+ return joblogInfo;
+ }
}
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 7923605..bfab276 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -255,11 +255,13 @@ public class NSparkExecutable extends AbstractExecutable {
CliCommandExecutor exec = new CliCommandExecutor();
Pair<Integer, String> result = exec.execute(cmd, patternedLogger,
jobId);
+ updateMetaAfterBuilding(config);
+ //Add metrics information to execute result for JobMetricsFacade
+ getManager().addJobInfo(getId(), getJobMetricsInfo(config));
Map<String, String> extraInfo =
makeExtraInfo(patternedLogger.getInfo());
ExecuteResult ret =
ExecuteResult.createSucceed(result.getSecond());
ret.getExtraInfo().putAll(extraInfo);
- updateMetaAfterBuilding(config);
return ret;
} catch (Exception e) {
return ExecuteResult.createError(e);
@@ -269,6 +271,10 @@ public class NSparkExecutable extends AbstractExecutable {
protected void updateMetaAfterBuilding(KylinConfig config) throws
IOException {
}
+ protected Map<String, String> getJobMetricsInfo(KylinConfig config) {
+ return Maps.newHashMap();
+ }
+
protected Map<String, String> getSparkConfigOverride(KylinConfig config) {
Map<String, String> sparkConfigOverride =
config.getSparkConfigOverride();
if (!sparkConfigOverride.containsKey("spark.driver.memory")) {
@@ -335,6 +341,9 @@ public class NSparkExecutable extends AbstractExecutable {
Class<? extends Object> appClz =
ClassUtil.forName(getSparkSubmitClassName(), Object.class);
appClz.getMethod("main", String[].class).invoke(null, (Object) new
String[] { appArgs });
updateMetaAfterBuilding(config);
+
+ //Add metrics information to execute result for JobMetricsFacade
+ getManager().addJobInfo(getId(), getJobMetricsInfo(config));
return ExecuteResult.createSucceed();
} catch (Exception e) {
return ExecuteResult.createError(e);
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java
index 68d4ee2..25de377 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java
@@ -44,7 +44,7 @@ public class NSparkMergingJob extends CubingJob {
private static final Logger logger =
LoggerFactory.getLogger(NSparkMergingJob.class);
public static NSparkMergingJob merge(CubeSegment mergedSegment, String
submitter) {
- return NSparkMergingJob.merge(mergedSegment, submitter,
JobTypeEnum.INDEX_MERGE, UUID.randomUUID().toString());
+ return NSparkMergingJob.merge(mergedSegment, submitter,
CubingJobTypeEnum.MERGE, UUID.randomUUID().toString());
}
/**
@@ -52,7 +52,7 @@ public class NSparkMergingJob extends CubingJob {
*
* @param mergedSegment, new segment that expect to merge, which should
contains a couple of ready segments.
*/
- public static NSparkMergingJob merge(CubeSegment mergedSegment, String
submitter, JobTypeEnum jobType, String jobId) {
+ public static NSparkMergingJob merge(CubeSegment mergedSegment, String
submitter, CubingJobTypeEnum jobType, String jobId) {
CubeInstance cube = mergedSegment.getCubeInstance();
NSparkMergingJob job = new NSparkMergingJob();
@@ -70,7 +70,6 @@ public class NSparkMergingJob extends CubingJob {
job.setTargetSubject(mergedSegment.getModel().getUuid());
job.setTargetSegments(Lists.newArrayList(String.valueOf(mergedSegment.getUuid())));
job.setProject(mergedSegment.getProject());
- job.setJobType(jobType);
job.setSubmitter(submitter);
job.setParam(MetadataConstants.P_JOB_ID, jobId);
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
index f0077af..ef9cd54 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
+++
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
@@ -93,8 +93,8 @@ object ResultPlan extends Logging {
try {
val rows = df.collect()
val (scanRows, scanBytes) =
QueryMetricUtils.collectScanMetrics(df.queryExecution.executedPlan)
- // QueryContextFacade.current().setScanRows(scanRows)
- // QueryContextFacade.current().setScanBytes(scanBytes)
+
QueryContextFacade.current().addAndGetScannedRows(scanRows.iterator().next())
+
QueryContextFacade.current().addAndGetScannedBytes(scanBytes.iterator().next())
val dt = rows.map { row =>
var rowIndex = 0
row.toSeq.map { cell => {
diff --git
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
index 5f64da0..264596b 100644
---
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
+++
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
@@ -451,7 +451,7 @@ public class CubeDescCreator {
desc.setNotifyList(Lists.<String> newArrayList());
desc.setStatusNeedNotify(Lists.newArrayList(JobStatusEnum.ERROR.toString()));
desc.setAutoMergeTimeRanges(new long[] { 86400000L, 604800000L,
2419200000L });
- desc.setEngineType(IEngineAware.ID_MR_V2);
+ desc.setEngineType(IEngineAware.ID_SPARK_II);
desc.setStorageType(storageType);
desc.setAggregationGroups(Lists.newArrayList(aggGroup));
desc.getOverrideKylinProps().putAll(overrideProperties);