This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new 2edeaea KYLIN-4996 Merge cuboid statistics in merge job for kylin4 2edeaea is described below commit 2edeaea625ce5a1e8bfbc73a72fbafccbfb420ff Author: yaqian.zhang <598593...@qq.com> AuthorDate: Wed May 19 15:27:25 2021 +0800 KYLIN-4996 Merge cuboid statistics in merge job for kylin4 --- .../kylin/job/constant/ExecutableConstants.java | 2 +- .../kylin/job/execution/ExecutableManager.java | 7 +- .../kylin/engine/spark/job/JobStepFactory.java | 3 + .../apache/kylin/engine/spark/job/JobStepType.java | 2 +- .../spark/job/NSparkMergeStatisticsStep.java | 162 +++++++++++++++++++++ .../kylin/engine/spark/job/NSparkMergingJob.java | 3 + .../org/apache/kylin/rest/service/JobService.java | 6 +- 7 files changed, 176 insertions(+), 9 deletions(-) diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index 0d5e482..4d079d8 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -92,7 +92,7 @@ public final class ExecutableConstants { public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL = "Build Global Dict - replace intermediate table"; public static final String FLINK_SPECIFIC_CONFIG_NAME_MERGE_DICTIONARY = "mergedict"; - //kylin on parquetv2 + //kylin on parquet v2 public static final String STEP_NAME_DETECT_RESOURCE = "Detect Resource"; public static final String STEP_NAME_BUILD_CUBOID_FROM_PARENT_CUBOID = "Build recommend cuboid from parent cuboid"; diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index 1c9e81c..2643b86 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -271,6 +271,9 @@ public class ExecutableManager { AbstractExecutable jobInstance = getJob(jobId); String outputStorePath = KylinConfig.getInstanceFromEnv().getJobOutputStorePath(jobInstance.getParam(MetadataConstants.P_PROJECT_NAME), stepId); ExecutableOutputPO jobOutput = getJobOutputFromHDFS(outputStorePath); + if (jobOutput == null) { + return null; + } assertOutputNotNull(jobOutput, outputStorePath); if (Objects.nonNull(jobOutput.getLogPath())) { @@ -291,9 +294,7 @@ public class ExecutableManager { Path path = new Path(resPath); FileSystem fs = HadoopUtil.getWorkingFileSystem(); if (!fs.exists(path)) { - ExecutableOutputPO executableOutputPO = new ExecutableOutputPO(); - executableOutputPO.setContent("job output not found, please check kylin.log"); - return executableOutputPO; + return null; } din = fs.open(path); diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java index ead4223..669ca65 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java @@ -45,6 +45,9 @@ public class JobStepFactory { case OPTIMIZING: step = new NSparkOptimizingStep(OptimizeBuildJob.class.getName()); break; + case MERGE_STATISTICS: + step = new NSparkMergeStatisticsStep(); + break; case CLEAN_UP_AFTER_MERGE: step = new NSparkUpdateMetaAndCleanupAfterMergeStep(); break; diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepType.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepType.java index 3b4142d..3dbfee2 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepType.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepType.java @@ -21,7 +21,7 @@ package org.apache.kylin.engine.spark.job; public enum JobStepType { RESOURCE_DETECT, - CLEAN_UP_AFTER_MERGE, CUBING, MERGING, OPTIMIZING, + CLEAN_UP_AFTER_MERGE, CUBING, MERGING, MERGE_STATISTICS, OPTIMIZING, FILTER_RECOMMEND_CUBOID } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergeStatisticsStep.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergeStatisticsStep.java new file mode 100644 index 0000000..1884cd4 --- /dev/null +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergeStatisticsStep.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.job; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.CubeStatsWriter; +import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.measure.hllc.HLLCounter; +import org.apache.kylin.metadata.MetadataConstants; +import org.apache.kylin.shaded.com.google.common.collect.Lists; +import org.apache.kylin.shaded.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Map; + +public class NSparkMergeStatisticsStep extends NSparkExecutable { + private static final Logger logger = LoggerFactory.getLogger(NSparkMergeStatisticsStep.class); + + private List<CubeSegment> mergingSegments = Lists.newArrayList(); + protected Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap(); + + public NSparkMergeStatisticsStep() { + this.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + String jobId = getParam(MetadataConstants.P_JOB_ID); + String cubeId = getParam(MetadataConstants.P_CUBE_ID); + + String mergedSegmentUuid = getParam(MetadataConstants.P_SEGMENT_IDS); + final KylinConfig kylinConfig = wrapConfig(context); + CubeInstance cube = CubeManager.getInstance(kylinConfig).getCubeByUuid(cubeId); + CubeSegment mergedSeg = cube.getSegmentById(mergedSegmentUuid); + + String jobTmpDir = kylinConfig.getJobTmpDir(cube.getProject()) + "/" + jobId; + Path statisticsDir = new Path(jobTmpDir + "/" + ResourceStore.CUBE_STATISTICS_ROOT + "/" + + cubeId + "/" + mergedSeg.getUuid() + "/"); + + mergingSegments = cube.getMergingSegments(mergedSeg); + + Configuration conf = HadoopUtil.getCurrentConfiguration(); + ResourceStore rs = ResourceStore.getStore(kylinConfig); + try { + int averageSamplingPercentage = 0; + long sourceRecordCount = 0; + for (CubeSegment segment : mergingSegments) { + String segmentId = segment.getUuid(); + String fileKey = CubeSegment + .getStatisticsResourcePath(cube.getName(), segmentId); + InputStream is = rs.getResource(fileKey).content(); + File tempFile = null; + FileOutputStream tempFileStream = null; + try { + tempFile = File.createTempFile(segmentId, ".seq"); + tempFileStream = new FileOutputStream(tempFile); + org.apache.commons.io.IOUtils.copy(is, tempFileStream); + } finally { + IOUtils.closeStream(is); + IOUtils.closeStream(tempFileStream); + } + + FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath()); + SequenceFile.Reader reader = null; + try { + reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf); + LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); + BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf); + while (reader.next(key, value)) { + if (key.get() == 0L) { + // sampling percentage; + averageSamplingPercentage += Bytes.toInt(value.getBytes()); + } else if (key.get() == -3) { + long perSourceRecordCount = Bytes.toLong(value.getBytes()); + if (perSourceRecordCount > 0) { + sourceRecordCount += perSourceRecordCount; + } + } else if (key.get() > 0) { + HLLCounter hll = new HLLCounter(kylinConfig.getCubeStatsHLLPrecision()); + ByteArray byteArray = new ByteArray(value.getBytes()); + hll.readRegisters(byteArray.asBuffer()); + + if (cuboidHLLMap.get(key.get()) != null) { + cuboidHLLMap.get(key.get()).merge(hll); + } else { + cuboidHLLMap.put(key.get(), hll); + } + } + } + } catch (Exception e) { + e.printStackTrace(); + throw e; + } finally { + IOUtils.closeStream(reader); + if (tempFile != null) + tempFile.delete(); + } + } + averageSamplingPercentage = averageSamplingPercentage / mergingSegments.size(); + CubeStatsWriter.writeCuboidStatistics(conf, statisticsDir, cuboidHLLMap, + averageSamplingPercentage, sourceRecordCount); + Path statisticsFilePath = new Path(statisticsDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); + FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf); + FSDataInputStream is = fs.open(statisticsFilePath); + try { + // put the statistics to metadata store + String statisticsFileName = mergedSeg.getStatisticsResourcePath(); + rs.putResource(statisticsFileName, is, System.currentTimeMillis()); + } finally { + IOUtils.closeStream(is); + } + + return ExecuteResult.createSucceed(); + } catch (IOException e) { + logger.error("fail to merge cuboid statistics", e); + return ExecuteResult.createError(e); + } + } + +} diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java index c52c89c..9a53067 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java @@ -87,6 +87,9 @@ public class NSparkMergingJob extends CubingJob { JobStepFactory.addStep(job, JobStepType.RESOURCE_DETECT, cube); JobStepFactory.addStep(job, JobStepType.MERGING, cube); + if (KylinConfig.getInstanceFromEnv().isSegmentStatisticsEnabled()) { + JobStepFactory.addStep(job, JobStepType.MERGE_STATISTICS, cube); + } JobStepFactory.addStep(job, JobStepType.CLEAN_UP_AFTER_MERGE, cube); return job; diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index 3cfcc78..7619d36 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -501,8 +501,7 @@ public class JobService extends BasicService implements InitializingBean { public String getJobStepOutput(String jobId, String stepId) { ExecutableManager executableManager = getExecutableManager(); - AbstractExecutable job = executableManager.getJob(jobId); - if (job instanceof CheckpointExecutable) { + if (executableManager.getOutputFromHDFSByJobId(jobId, stepId) == null) { return executableManager.getOutput(stepId).getVerboseMsg(); } return executableManager.getOutputFromHDFSByJobId(jobId, stepId).getVerboseMsg(); @@ -510,8 +509,7 @@ public class JobService extends BasicService implements InitializingBean { public String getAllJobStepOutput(String jobId, String stepId) { ExecutableManager executableManager = getExecutableManager(); - AbstractExecutable job = executableManager.getJob(jobId); - if (job instanceof CheckpointExecutable) { + if (executableManager.getOutputFromHDFSByJobId(jobId, stepId, Integer.MAX_VALUE) == null) { return executableManager.getOutput(stepId).getVerboseMsg(); } return executableManager.getOutputFromHDFSByJobId(jobId, stepId, Integer.MAX_VALUE).getVerboseMsg();