This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new a2e9bfa KYLIN-4516 support System cube a2e9bfa is described below commit a2e9bfa52fe13c7a0b22cd8fd24f1b00ab4a934c Author: rupengwang <wangrup...@live.cn> AuthorDate: Sun May 31 10:25:26 2020 +0800 KYLIN-4516 support System cube --- .../java/org/apache/kylin/metadata/MetadataConstants.java | 3 +++ .../org/apache/kylin/engine/spark/job/NSparkCubingJob.java | 8 ++++---- .../org/apache/kylin/engine/spark/job/NSparkCubingStep.java | 13 +++++++++++++ .../org/apache/kylin/engine/spark/job/NSparkExecutable.java | 11 ++++++++++- .../org/apache/kylin/engine/spark/job/NSparkMergingJob.java | 5 ++--- .../org/apache/kylin/query/runtime/plans/ResultPlan.scala | 4 ++-- .../kylin/tool/metrics/systemcube/CubeDescCreator.java | 2 +- 7 files changed, 35 insertions(+), 11 deletions(-) diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java index fd8e32a..57801b6 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java @@ -56,4 +56,7 @@ public interface MetadataConstants { public static final String TABLE_EXD_CARDINALITY = "cardinality"; public static final String TABLE_EXD_DELIM = "delim"; public static final String TABLE_EXD_DEFAULT_VALUE = "unknown"; + + public static final String SOURCE_RECORD_COUNT = "sourceRecordCount"; + } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java index d6dab06..f4e244f 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java @@ -33,7 +33,6 @@ import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.engine.spark.utils.MetaDumpUtil; -import org.apache.kylin.job.execution.JobTypeEnum; import org.apache.kylin.metadata.MetadataConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,10 +53,10 @@ public class NSparkCubingJob extends CubingJob { // for test use only public static NSparkCubingJob create(Set<CubeSegment> segments, String submitter) { - return create(segments, submitter, JobTypeEnum.INDEX_BUILD, UUID.randomUUID().toString()); + return create(segments, submitter, CubingJobTypeEnum.BUILD, UUID.randomUUID().toString()); } - public static NSparkCubingJob create(Set<CubeSegment> segments, String submitter, JobTypeEnum jobType, + public static NSparkCubingJob create(Set<CubeSegment> segments, String submitter, CubingJobTypeEnum jobType, String jobId) { Preconditions.checkArgument(!segments.isEmpty()); Preconditions.checkArgument(submitter != null); @@ -80,7 +79,6 @@ public class NSparkCubingJob extends CubingJob { job.setId(jobId); job.setName(builder.toString()); job.setProjectName(job.cube.getProject()); - job.setJobType(jobType); job.setTargetSubject(job.cube.getModel().getId()); job.setTargetSegments(segments.stream().map(x -> String.valueOf(x.getUuid())).collect(Collectors.toList())); job.setProject(job.cube.getProject()); @@ -98,6 +96,8 @@ public class NSparkCubingJob extends CubingJob { job.setParam(MetadataConstants.P_OUTPUT_META_URL, job.cube.getConfig().getMetadataUrl().toString()); job.setParam(MetadataConstants.P_CUBOID_NUMBER, String.valueOf(job.cube.getDescriptor().getAllCuboids().size())); + //set param for job metrics + job.setParam(MetadataConstants.P_JOB_TYPE, jobType.toString()); JobStepFactory.addStep(job, JobStepType.RESOURCE_DETECT, job.cube); JobStepFactory.addStep(job, JobStepType.CUBING, job.cube); diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java index 4c2fcb9..bdc68c6 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java @@ -19,9 +19,11 @@ package org.apache.kylin.engine.spark.job; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.spark.utils.MetaDumpUtil; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; @@ -37,6 +39,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Set; /** @@ -106,4 +109,14 @@ public class NSparkCubingStep extends NSparkExecutable { } cubeManager.updateCube(update); } + + @Override + protected Map<String, String> getJobMetricsInfo(KylinConfig config) { + CubeManager cubeManager = CubeManager.getInstance(config); + CubeInstance cube = cubeManager.getCube(getCubeName()); + Map<String, String> joblogInfo = Maps.newHashMap(); + joblogInfo.put(CubingJob.SOURCE_SIZE_BYTES, String.valueOf(cube.getInputRecordSizeBytes())); + joblogInfo.put(CubingJob.CUBE_SIZE_BYTES, String.valueOf(cube.getSizeKB())); + return joblogInfo; + } } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java index 7923605..bfab276 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java @@ -255,11 +255,13 @@ public class NSparkExecutable extends AbstractExecutable { CliCommandExecutor exec = new CliCommandExecutor(); Pair<Integer, String> result = exec.execute(cmd, patternedLogger, jobId); + updateMetaAfterBuilding(config); + //Add metrics information to execute result for JobMetricsFacade + getManager().addJobInfo(getId(), getJobMetricsInfo(config)); Map<String, String> extraInfo = makeExtraInfo(patternedLogger.getInfo()); ExecuteResult ret = ExecuteResult.createSucceed(result.getSecond()); ret.getExtraInfo().putAll(extraInfo); - updateMetaAfterBuilding(config); return ret; } catch (Exception e) { return ExecuteResult.createError(e); @@ -269,6 +271,10 @@ public class NSparkExecutable extends AbstractExecutable { protected void updateMetaAfterBuilding(KylinConfig config) throws IOException { } + protected Map<String, String> getJobMetricsInfo(KylinConfig config) { + return Maps.newHashMap(); + } + protected Map<String, String> getSparkConfigOverride(KylinConfig config) { Map<String, String> sparkConfigOverride = config.getSparkConfigOverride(); if (!sparkConfigOverride.containsKey("spark.driver.memory")) { @@ -335,6 +341,9 @@ public class NSparkExecutable extends AbstractExecutable { Class<? extends Object> appClz = ClassUtil.forName(getSparkSubmitClassName(), Object.class); appClz.getMethod("main", String[].class).invoke(null, (Object) new String[] { appArgs }); updateMetaAfterBuilding(config); + + //Add metrics information to execute result for JobMetricsFacade + getManager().addJobInfo(getId(), getJobMetricsInfo(config)); return ExecuteResult.createSucceed(); } catch (Exception e) { return ExecuteResult.createError(e); diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java index 68d4ee2..25de377 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java @@ -44,7 +44,7 @@ public class NSparkMergingJob extends CubingJob { private static final Logger logger = LoggerFactory.getLogger(NSparkMergingJob.class); public static NSparkMergingJob merge(CubeSegment mergedSegment, String submitter) { - return NSparkMergingJob.merge(mergedSegment, submitter, JobTypeEnum.INDEX_MERGE, UUID.randomUUID().toString()); + return NSparkMergingJob.merge(mergedSegment, submitter, CubingJobTypeEnum.MERGE, UUID.randomUUID().toString()); } /** @@ -52,7 +52,7 @@ public class NSparkMergingJob extends CubingJob { * * @param mergedSegment, new segment that expect to merge, which should contains a couple of ready segments. */ - public static NSparkMergingJob merge(CubeSegment mergedSegment, String submitter, JobTypeEnum jobType, String jobId) { + public static NSparkMergingJob merge(CubeSegment mergedSegment, String submitter, CubingJobTypeEnum jobType, String jobId) { CubeInstance cube = mergedSegment.getCubeInstance(); NSparkMergingJob job = new NSparkMergingJob(); @@ -70,7 +70,6 @@ public class NSparkMergingJob extends CubingJob { job.setTargetSubject(mergedSegment.getModel().getUuid()); job.setTargetSegments(Lists.newArrayList(String.valueOf(mergedSegment.getUuid()))); job.setProject(mergedSegment.getProject()); - job.setJobType(jobType); job.setSubmitter(submitter); job.setParam(MetadataConstants.P_JOB_ID, jobId); diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala index f0077af..ef9cd54 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala @@ -93,8 +93,8 @@ object ResultPlan extends Logging { try { val rows = df.collect() val (scanRows, scanBytes) = QueryMetricUtils.collectScanMetrics(df.queryExecution.executedPlan) - // QueryContextFacade.current().setScanRows(scanRows) - // QueryContextFacade.current().setScanBytes(scanBytes) + QueryContextFacade.current().addAndGetScannedRows(scanRows.iterator().next()) + QueryContextFacade.current().addAndGetScannedBytes(scanBytes.iterator().next()) val dt = rows.map { row => var rowIndex = 0 row.toSeq.map { cell => { diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java index 5f64da0..264596b 100644 --- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java +++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java @@ -451,7 +451,7 @@ public class CubeDescCreator { desc.setNotifyList(Lists.<String> newArrayList()); desc.setStatusNeedNotify(Lists.newArrayList(JobStatusEnum.ERROR.toString())); desc.setAutoMergeTimeRanges(new long[] { 86400000L, 604800000L, 2419200000L }); - desc.setEngineType(IEngineAware.ID_MR_V2); + desc.setEngineType(IEngineAware.ID_SPARK_II); desc.setStorageType(storageType); desc.setAggregationGroups(Lists.newArrayList(aggGroup)); desc.getOverrideKylinProps().putAll(overrideProperties);