This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new efedf07 [KYLIN-4947] Implement spark engine for cube optimization jobs. efedf07 is described below commit efedf07043c704cbdfa04ac3af3d73cec866cb76 Author: yangjiang <yangji...@ebay.com> AuthorDate: Wed Apr 7 15:39:04 2021 +0800 [KYLIN-4947] Implement spark engine for cube optimization jobs. --- .../org/apache/kylin/common/KylinConfigBase.java | 8 + .../apache/kylin/cube/cuboid/CuboidScheduler.java | 3 +- .../org/apache/kylin/cube/cuboid/CuboidUtil.java | 1 + .../kylin/cube/cuboid/TreeCuboidScheduler.java | 7 +- .../org/apache/kylin/cube/kv/RowKeyDecoder.java | 3 +- .../kylin/job/constant/ExecutableConstants.java | 6 + .../kylin/engine/mr/BatchOptimizeJobBuilder2.java | 1 + .../kylin/engine/mr/common/BatchConstants.java | 5 +- .../kylin/engine/mr/common/CubeStatsWriter.java | 6 + .../kylin/engine/spark/KylinKryoRegistrator.java | 5 + .../engine/spark/SparkBatchCubingEngine2.java | 7 +- .../spark/SparkBatchOptimizeJobBuilder2.java | 211 ++++++++++++ .../kylin/engine/spark/SparkBuildDictionary.java | 4 +- .../SparkCalculateStatsFromBaseCuboidJob.java | 354 +++++++++++++++++++++ .../kylin/engine/spark/SparkCubingByLayer.java | 10 +- .../engine/spark/SparkCubingByLayerForOpt.java | 269 ++++++++++++++++ .../kylin/engine/spark/SparkCubingMerge.java | 4 +- .../apache/kylin/engine/spark/SparkExecutable.java | 10 +- .../kylin/engine/spark/SparkFactDistinct.java | 4 +- .../spark/SparkFilterRecommendCuboidDataJob.java | 168 ++++++++++ .../apache/kylin/engine/spark/SparkFunction.java | 104 ++++++ .../kylin/engine/spark/SparkMergingDictionary.java | 4 +- .../kylin/engine/spark/SparkUHCDictionary.java | 4 +- .../SparkUpdateShardForOldCuboidDataStep.java | 223 +++++++++++++ .../org/apache/kylin/engine/spark/SparkUtil.java | 99 +++++- .../kylin/storage/hbase/steps/BulkLoadJob.java | 2 +- .../kylin/storage/hbase/steps/CreateHTableJob.java | 2 + .../hbase/steps/HBaseSparkOutputTransition.java | 26 +- .../kylin/storage/hbase/steps/SparkCubeHFile.java | 4 +- 29 files changed, 1518 insertions(+), 36 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 8d22fa3..767ae8b 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1731,6 +1731,14 @@ public abstract class KylinConfigBase implements Serializable { return Boolean.parseBoolean(getOptional("kylin.engine.spark-create-table-enabled", FALSE)); } + public boolean isSparkOptimizeCubeViaSparkEnable() { + return Boolean.parseBoolean(getOptional("kylin.engine.spark-optimize-cube-enabled", TRUE)); + } + + public boolean isUseSparkCalculateStatsEnable() { + return Boolean.parseBoolean(getOptional("kylin.engine.spark-calculate-stats-enabled", TRUE)); + } + public boolean isFlinkSanityCheckEnabled() { return Boolean.parseBoolean(getOptional("kylin.engine.flink.sanity-check-enabled", FALSE)); } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java index 5c57fad..8ac19fd 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java @@ -18,6 +18,7 @@ package org.apache.kylin.cube.cuboid; +import java.io.Serializable; import java.util.Collections; import java.util.List; import java.util.Set; @@ -32,7 +33,7 @@ import org.apache.kylin.shaded.com.google.common.collect.Lists; /** * Defines a cuboid tree, rooted by the base cuboid. A parent cuboid generates its child cuboids. */ -abstract public class CuboidScheduler { +abstract public class CuboidScheduler implements Serializable { public static CuboidScheduler getInstance(CubeDesc cubeDesc) { String clzName = cubeDesc.getConfig().getCuboidScheduler(); diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java index aae3129..e40ea5f 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java @@ -32,6 +32,7 @@ import org.apache.kylin.shaded.com.google.common.collect.Maps; public class CuboidUtil { + // get the i cuboid the j '1' `s index public static Integer[][] getCuboidBitSet(Long[] cuboidIds, int nRowKey) { Preconditions.checkArgument(nRowKey < Long.SIZE, "the size of row key could not be large than " + (Long.SIZE - 1)); diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidScheduler.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidScheduler.java index 40242c4..b18e38b 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidScheduler.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidScheduler.java @@ -19,6 +19,7 @@ package org.apache.kylin.cube.cuboid; import java.io.PrintWriter; +import java.io.Serializable; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -71,7 +72,7 @@ public class TreeCuboidScheduler extends CuboidScheduler { return cuboidTree.isValid(requestCuboid); } - public static class CuboidTree { + public static class CuboidTree implements Serializable { private int treeLevels; private TreeNode root; @@ -232,7 +233,7 @@ public class TreeCuboidScheduler extends CuboidScheduler { } } - public static class TreeNode { + public static class TreeNode implements Serializable { @JsonProperty("cuboid_id") long cuboidId; @JsonIgnore @@ -290,7 +291,7 @@ public class TreeCuboidScheduler extends CuboidScheduler { /** * Compare cuboid according to the cuboid data row count */ - public static class CuboidCostComparator implements Comparator<Long> { + public static class CuboidCostComparator implements Comparator<Long>, Serializable { private Map<Long, Long> cuboidStatistics; public CuboidCostComparator(Map<Long, Long> cuboidStatistics) { diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java index 71ad4bf..516ce7f 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java @@ -19,6 +19,7 @@ package org.apache.kylin.cube.kv; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; @@ -34,7 +35,7 @@ import org.apache.kylin.metadata.model.TblColRef; * @author xjiang * */ -public class RowKeyDecoder { +public class RowKeyDecoder implements Serializable { private final CubeDesc cubeDesc; private final RowKeyColumnIO colIO; diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index 4deab99..aa9e875 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -46,12 +46,17 @@ public final class ExecutableConstants { public static final String STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE = "Sqoop To Flat Hive Table"; public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables"; public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns"; + public static final String STEP_NAME_FACT_DISTINCT_COLUMNS_SPARK = "Extract Fact Table Distinct Columns With Spark"; public static final String STEP_NAME_CALCULATE_STATS_FROM_BASE_CUBOID = "Calculate Statistics from Base Cuboid"; public static final String STEP_NAME_FILTER_RECOMMEND_CUBOID_DATA_FOR_OPTIMIZATION = "Filter Recommend Cuboid Data for Optimization"; + public static final String STEP_NAME_FILTER_RECOMMEND_CUBOID_DATA_FOR_OPTIMIZATION_SPARK = "Filter Recommend Cuboid Data for Optimization with Spark"; + public static final String STEP_NAME_CALCULATE_STATS_FROM_BASECUBOID_SPARK = "Calculate Stats From BaseCuboid Step with Spark"; public static final String STEP_NAME_UPDATE_OLD_CUBOID_SHARD = "Update Old Cuboid Shard for Optimization"; + public static final String STEP_NAME_UPDATE_OLD_CUBOID_SHARD_SPARK = "Update Old Cuboid Shard for Optimization With Spark"; public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid"; public static final String STEP_NAME_BUILD_IN_MEM_CUBE = "Build Cube In-Mem"; public static final String STEP_NAME_BUILD_SPARK_CUBE = "Build Cube with Spark"; + public static final String STEP_NAME_OPTIMIZE_SPARK_CUBE = "Optimize Cube with Spark"; public static final String STEP_NAME_BUILD_FLINK_CUBE = "Build Cube with Flink"; public static final String STEP_NAME_BUILD_N_D_CUBOID = "Build N-Dimension Cuboid"; public static final String STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION = "Calculate HTable Region Splits"; @@ -64,6 +69,7 @@ public final class ExecutableConstants { public static final String STEP_NAME_MERGE_STATISTICS = "Merge Cuboid Statistics"; public static final String STEP_NAME_MERGE_UPDATE_DICTIONARY = "Update Dictionary Data"; public static final String STEP_NAME_MERGE_STATISTICS_WITH_OLD = "Merge Cuboid Statistics with Old for Optimization"; + public static final String STEP_NAME_MERGE_STATISTICS_WITH_SPARK = "Merge Cuboid Statistics with Spark for Optimization"; public static final String STEP_NAME_SAVE_STATISTICS = "Save Cuboid Statistics"; public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data"; public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info"; diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java index 9e8b9e8..18a9ded 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java @@ -64,6 +64,7 @@ public class BatchOptimizeJobBuilder2 extends JobBuilderSupport { // Phase 1: Prepare base cuboid data from old segment String oldcuboidRootPath = getCuboidRootPath(oldSegment) + "*"; + // write to optimizeCuboidRootPath + /base_cuboid OR +/old result.addTask(createFilterRecommendCuboidDataStep(oldcuboidRootPath, optimizeCuboidRootPath)); // Phase 2: Prepare dictionary and statistics for new segment diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index d0e2936..dd2dc6e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -94,7 +94,7 @@ public interface BatchConstants { String ARG_CUBE_NAME = "cubename"; String ARG_II_NAME = "iiname"; String ARG_SEGMENT_NAME = "segmentname"; - String ARG_SEGMENT_ID = "segmentid"; + String ARG_SEGMENT_ID = "segmentId"; String ARG_PARTITION = "partitions"; String ARG_STATS_ENABLED = "statisticsenabled"; String ARG_STATS_OUTPUT = "statisticsoutput"; @@ -107,7 +107,7 @@ public interface BatchConstants { String ARG_TABLE_NAME = "tableName"; String ARG_LOOKUP_SNAPSHOT_ID = "snapshotID"; String ARG_EXT_LOOKUP_SNAPSHOTS_INFO = "extlookupsnapshots"; - String ARG_META_URL = "metadataUrl"; + String ARG_META_URL = "metaUrl"; String ARG_HBASE_CONF_PATH = "hbaseConfPath"; String ARG_SHRUNKEN_DICT_PATH = "shrunkenDictPath"; String ARG_COUNTER_OUTPUT = "counterOutput"; @@ -116,6 +116,7 @@ public interface BatchConstants { String ARG_BASE64_ENCODED_SQL = "base64EncodedSql"; String ARG_GLOBAL_DIC_PART_REDUCE_STATS = "global_dict_part_reduce_stats"; String ARG_GLOBAL_DIC_MAX_DISTINCT_COUNT = "global_dict_max_distinct_count"; + String ARG_HIVE_TABLE = "hiveTable"; /** * logger and counter diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java index 3c41e1f..4348bdf 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java @@ -35,9 +35,13 @@ import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.hllc.HLLCounter; import org.apache.kylin.shaded.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class CubeStatsWriter { + protected static final Logger logger = LoggerFactory.getLogger(CubeStatsWriter.class); + public static void writeCuboidStatistics(Configuration conf, Path outputPath, // Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage) throws IOException { writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0, 0); @@ -62,6 +66,8 @@ public class CubeStatsWriter { Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio, int shard) throws IOException { Path seqFilePath = new Path(outputPath, BatchConstants.CFG_OUTPUT_STATISTICS + "_" + shard); + logger.info("writePartialCuboidStatistics for cuboid: " + cuboidHLLMap.keySet().toString()); + logger.info("writePartialCuboidStatistics Path: " + seqFilePath); writeCuboidStatisticsInner(conf, seqFilePath, cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio, 0); } diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java index a50c11d..5cf1e2f 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java @@ -86,6 +86,8 @@ public class KylinKryoRegistrator implements KryoRegistrator { kyroClasses.add(org.apache.kylin.cube.model.HBaseColumnFamilyDesc[].class); kyroClasses.add(org.apache.kylin.cube.model.HBaseColumnDesc[].class); kyroClasses.add(org.apache.kylin.cube.model.RowKeyColDesc[].class); + kyroClasses.add(org.apache.kylin.cube.cuboid.TreeCuboidScheduler.TreeNode.class); + kyroClasses.add(org.apache.kylin.cube.cuboid.TreeCuboidScheduler.CuboidCostComparator.class); kylinClassByReflection1(kyroClasses); kylinClassByReflection2(kyroClasses); @@ -146,6 +148,8 @@ public class KylinKryoRegistrator implements KryoRegistrator { kyroClasses.add(org.apache.kylin.cube.common.RowKeySplitter.class); kyroClasses.add(org.apache.kylin.cube.cuboid.Cuboid.class); kyroClasses.add(org.apache.kylin.cube.cuboid.DefaultCuboidScheduler.class); + kyroClasses.add(org.apache.kylin.cube.cuboid.TreeCuboidScheduler.class); + kyroClasses.add(org.apache.kylin.cube.cuboid.TreeCuboidScheduler.CuboidTree.class); kyroClasses.add(org.apache.kylin.cube.gridtable.TrimmedDimensionSerializer.class); kyroClasses.add(org.apache.kylin.cube.kv.AbstractRowKeyEncoder.class); kyroClasses.add(org.apache.kylin.cube.kv.CubeDimEncMap.class); @@ -155,6 +159,7 @@ public class KylinKryoRegistrator implements KryoRegistrator { kyroClasses.add(org.apache.kylin.cube.kv.RowKeyColumnIO.class); kyroClasses.add(org.apache.kylin.cube.kv.RowKeyEncoder.class); kyroClasses.add(org.apache.kylin.cube.kv.RowKeyEncoderProvider.class); + kyroClasses.add(org.apache.kylin.cube.kv.RowKeyDecoder.class); kyroClasses.add(org.apache.kylin.cube.model.AggregationGroup.class); kyroClasses.add(org.apache.kylin.cube.model.AggregationGroup.HierarchyMask.class); kyroClasses.add(org.apache.kylin.cube.model.CubeDesc.class); diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java index d3afb03..24f3f91 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java @@ -51,8 +51,11 @@ public class SparkBatchCubingEngine2 implements IBatchCubingEngine { @Override public DefaultChainedExecutable createBatchOptimizeJob(CubeSegment optimizeSegment, String submitter) { - //TODO use Spark to optimize - return new BatchOptimizeJobBuilder2(optimizeSegment, submitter).build(); + if (optimizeSegment.getConfig().isSparkOptimizeCubeViaSparkEnable()) { + return new SparkBatchOptimizeJobBuilder2(optimizeSegment, submitter).build(); + } else { + return new BatchOptimizeJobBuilder2(optimizeSegment, submitter).build(); + } } @Override diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchOptimizeJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchOptimizeJobBuilder2.java new file mode 100644 index 0000000..2971857 --- /dev/null +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchOptimizeJobBuilder2.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark; + +import org.apache.kylin.common.util.StringUtil; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.CuboidModeEnum; +import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.engine.mr.JobBuilderSupport; +import org.apache.kylin.engine.mr.steps.CopyDictionaryStep; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.engine.mr.steps.MergeStatisticsWithOldStep; +import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterOptimizeStep; +import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.shaded.com.google.common.base.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SparkBatchOptimizeJobBuilder2 extends JobBuilderSupport { + private static final Logger logger = LoggerFactory.getLogger(SparkBatchOptimizeJobBuilder2.class); + + private final ISparkOutput.ISparkBatchOptimizeOutputSide outputSide; + + public SparkBatchOptimizeJobBuilder2(CubeSegment optimizeSegment, String submitter) { + super(optimizeSegment, submitter); + this.outputSide = SparkUtil.getBatchOptimizeOutputSide2(seg); + } + + public CubingJob build() { + logger.info("Spark new job to Optimize segment " + seg); + final CubingJob result = CubingJob.createOptimizeJob(seg, submitter, config); + final String jobId = result.getId(); + final String cuboidRootPath = getCuboidRootPath(jobId); + + final String optimizeCuboidRootPath = getOptimizationCuboidPath(jobId); + + CubeSegment oldSegment = seg.getCubeInstance().getOriginalSegmentToOptimize(seg); + Preconditions.checkNotNull(oldSegment, "cannot find the original segment to be optimized by " + seg); + + // Phase 1: Prepare base cuboid data from old segment + String oldcuboidRootPath = getCuboidRootPath(oldSegment) + "*"; + // Filter cuboid to optimizeCuboidRootPath + /base_cuboid and +/old + result.addTask(createFilterRecommendCuboidDataStep(oldcuboidRootPath, optimizeCuboidRootPath, jobId)); + + // Phase 2: Prepare dictionary and statistics for new segment + result.addTask(createCopyDictionaryStep()); + String optStatsSourcePath = getBaseCuboidPath(optimizeCuboidRootPath); + String optStatsDstPath = getOptimizationStatisticsPath(jobId); + // Calculate statistic + if (seg.getConfig().isUseSparkCalculateStatsEnable()) { + result.addTask(createCalculateStatsFromBaseCuboidStepWithSpark(optStatsSourcePath, optStatsDstPath, + CuboidModeEnum.RECOMMEND_MISSING, jobId)); + } else { + result.addTask(createCalculateStatsFromBaseCuboid(optStatsSourcePath, optStatsDstPath, + CuboidModeEnum.RECOMMEND_MISSING)); + } + + result.addTask(createMergeStatisticsWithOldStep(jobId, optStatsDstPath, getStatisticsPath(jobId))); + outputSide.addStepPhase2_CreateHTable(result); + + result.addTask(createUpdateShardForOldCuboidDataStep(optimizeCuboidRootPath, cuboidRootPath, jobId)); + + // Phase 3: Build Cube for Missing Cuboid Data + addLayerCubingSteps(result, jobId, CuboidModeEnum.RECOMMEND_MISSING_WITH_BASE, + SparkUtil.generateFilePath(PathNameCuboidBase, cuboidRootPath), cuboidRootPath); // layer cubing + + outputSide.addStepPhase3_BuildCube(result); + + // Phase 4: Update Metadata & Cleanup + result.addTask(createUpdateCubeInfoAfterOptimizeStep(jobId)); + outputSide.addStepPhase4_Cleanup(result); + + return result; + } + + private SparkExecutable createCalculateStatsFromBaseCuboidStepWithSpark(String inputPath, String outputPath, + CuboidModeEnum recommendMissing, String jobId) { + final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig()); + sparkExecutable.setName(ExecutableConstants.STEP_NAME_CALCULATE_STATS_FROM_BASECUBOID_SPARK); + + sparkExecutable.setParam(SparkCalculateStatsFromBaseCuboidJob.OPTION_CUBE_NAME.getOpt(), + seg.getRealization().getName()); + sparkExecutable.setParam(SparkCalculateStatsFromBaseCuboidJob.OPTION_SEGMENT_ID.getOpt(), seg.getUuid()); + sparkExecutable.setParam(SparkCalculateStatsFromBaseCuboidJob.OPTION_INPUT_PATH.getOpt(), inputPath); + sparkExecutable.setParam(SparkCalculateStatsFromBaseCuboidJob.OPTION_OUTPUT_PATH.getOpt(), outputPath); + sparkExecutable.setParam(SparkCalculateStatsFromBaseCuboidJob.OPTION_META_URL.getOpt(), + getSegmentMetadataUrl(seg.getConfig(), jobId)); + sparkExecutable.setParam(SparkCalculateStatsFromBaseCuboidJob.OPTION_JOB_MODE.getOpt(), + recommendMissing.toString()); + sparkExecutable.setParam(SparkCalculateStatsFromBaseCuboidJob.OPTION_SAMPLING_PERCENT.getOpt(), + String.valueOf(config.getConfig().getCubingInMemSamplingPercent())); + //need JobId in sparkExecutable + sparkExecutable.setJobId(jobId); + + sparkExecutable.setClassName(SparkCalculateStatsFromBaseCuboidJob.class.getName()); + + return sparkExecutable; + } + + private SparkExecutable createFilterRecommendCuboidDataStep(String inputPath, String outputPath , String jobId) { + final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig()); + sparkExecutable.setName(ExecutableConstants.STEP_NAME_FILTER_RECOMMEND_CUBOID_DATA_FOR_OPTIMIZATION_SPARK); + + sparkExecutable.setParam(SparkFilterRecommendCuboidDataJob.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName()); + sparkExecutable.setParam(SparkFilterRecommendCuboidDataJob.OPTION_SEGMENT_ID.getOpt(), seg.getUuid()); + sparkExecutable.setParam(SparkFilterRecommendCuboidDataJob.OPTION_INPUT_PATH.getOpt(), inputPath); + sparkExecutable.setParam(SparkFilterRecommendCuboidDataJob.OPTION_OUTPUT_PATH.getOpt(), outputPath); + sparkExecutable.setParam(SparkFilterRecommendCuboidDataJob.OPTION_META_URL.getOpt(), getSegmentMetadataUrl(seg.getConfig(), jobId)); + sparkExecutable.setClassName(SparkFilterRecommendCuboidDataJob.class.getName()); + //need JobId in sparkExecutable + sparkExecutable.setJobId(jobId); + + return sparkExecutable; + } + + private UpdateCubeInfoAfterOptimizeStep createUpdateCubeInfoAfterOptimizeStep(String jobId) { + final UpdateCubeInfoAfterOptimizeStep result = new UpdateCubeInfoAfterOptimizeStep(); + result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO); + + CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams()); + CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams()); + CubingExecutableUtil.setCubingJobId(jobId, result.getParams()); + + return result; + } + + private void addLayerCubingSteps(CubingJob result, String jobId, CuboidModeEnum mode, String input, String output) { + final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig()); + sparkExecutable.setClassName(SparkCubingByLayerForOpt.class.getName()); + configureSparkJob(seg, sparkExecutable, jobId, input, output, mode); + result.addTask(sparkExecutable); + } + + private SparkExecutable createUpdateShardForOldCuboidDataStep(String inputPath, String outputPath, String jobId) { + final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig()); + sparkExecutable.setName(ExecutableConstants.STEP_NAME_UPDATE_OLD_CUBOID_SHARD_SPARK); + + sparkExecutable.setParam(SparkUpdateShardForOldCuboidDataStep.OPTION_CUBE_NAME.getOpt(), + seg.getRealization().getName()); + sparkExecutable.setParam(SparkUpdateShardForOldCuboidDataStep.OPTION_SEGMENT_ID.getOpt(), seg.getUuid()); + sparkExecutable.setParam(SparkUpdateShardForOldCuboidDataStep.OPTION_INPUT_PATH.getOpt(), inputPath); + sparkExecutable.setParam(SparkUpdateShardForOldCuboidDataStep.OPTION_OUTPUT_PATH.getOpt(), outputPath); + sparkExecutable.setParam(SparkUpdateShardForOldCuboidDataStep.OPTION_META_URL.getOpt(), + getSegmentMetadataUrl(seg.getConfig(), jobId)); + //need JobId in sparkExecutable + sparkExecutable.setJobId(jobId); + sparkExecutable.setClassName(SparkUpdateShardForOldCuboidDataStep.class.getName()); + + return sparkExecutable; + } + + private MergeStatisticsWithOldStep createMergeStatisticsWithOldStep(String jobId, String optStatsPath, + String mergedStatisticsFolder) { + MergeStatisticsWithOldStep result = new MergeStatisticsWithOldStep(); + result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS_WITH_OLD); + + CubingExecutableUtil.setCubingJobId(jobId, result.getParams()); + CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams()); + CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams()); + CubingExecutableUtil.setStatisticsPath(optStatsPath, result.getParams()); + CubingExecutableUtil.setMergedStatisticsPath(mergedStatisticsFolder, result.getParams()); + + return result; + } + + private AbstractExecutable createCopyDictionaryStep() { + CopyDictionaryStep result = new CopyDictionaryStep(); + result.setName(ExecutableConstants.STEP_NAME_COPY_DICTIONARY); + + CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams()); + CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams()); + return result; + } + + private void configureSparkJob(final CubeSegment seg, final SparkExecutable sparkExecutable, final String jobId, + final String input, String output, CuboidModeEnum mode) { + + sparkExecutable.setParam(SparkCubingByLayerForOpt.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName()); + sparkExecutable.setParam(SparkCubingByLayerForOpt.OPTION_SEGMENT_ID.getOpt(), seg.getUuid()); + sparkExecutable.setParam(SparkCubingByLayerForOpt.OPTION_META_URL.getOpt(), + getSegmentMetadataUrl(seg.getConfig(), jobId)); + sparkExecutable.setParam(SparkCubingByLayerForOpt.OPTION_OUTPUT_PATH.getOpt(), output); + sparkExecutable.setParam(SparkCubingByLayerForOpt.OPTION_INPUT_PATH.getOpt(), input); + sparkExecutable.setParam(SparkCubingByLayerForOpt.OPTION_CUBOID_MODE.getOpt(), mode.toString()); + sparkExecutable.setJobId(jobId); + + StringBuilder jars = new StringBuilder(); + + StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars()); + sparkExecutable.setJars(jars.toString()); + sparkExecutable.setName(ExecutableConstants.STEP_NAME_OPTIMIZE_SPARK_CUBE + ":" + seg.toString()); + } +} diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBuildDictionary.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBuildDictionary.java index 25020eb..2d29e1a 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBuildDictionary.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBuildDictionary.java @@ -93,14 +93,14 @@ public class SparkBuildDictionary extends AbstractApplication implements Seriali public static final Option OPTION_DICT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_DICT_PATH).hasArg() .isRequired(true).withDescription("Cube dictionary output path").create(BatchConstants.ARG_DICT_PATH); public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segmentId").hasArg().isRequired(true) - .withDescription("Cube Segment Id").create("segmentId"); + .withDescription("Cube Segment Id").create(BatchConstants.ARG_SEGMENT_ID); public static final Option OPTION_CUBING_JOB_ID = OptionBuilder .withArgName(BatchConstants.ARG_CUBING_JOB_ID).hasArg().isRequired(true) .withDescription("Cubing job id").create(BatchConstants.ARG_CUBING_JOB_ID); public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg() .isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT); public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true) - .withDescription("HDFS metadata url").create("metaUrl"); + .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL); public static final Option OPTION_COUNTER_PATH = OptionBuilder.withArgName(BatchConstants.ARG_COUNTER_OUTPUT).hasArg() .isRequired(true).withDescription("counter output path").create(BatchConstants.ARG_COUNTER_OUTPUT); diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCalculateStatsFromBaseCuboidJob.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCalculateStatsFromBaseCuboidJob.java new file mode 100644 index 0000000..838efc0 --- /dev/null +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCalculateStatsFromBaseCuboidJob.java @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.KylinVersion; +import org.apache.kylin.common.util.AbstractApplication; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.OptionsHelper; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.CuboidModeEnum; +import org.apache.kylin.cube.cuboid.CuboidUtil; +import org.apache.kylin.cube.kv.RowKeyDecoder; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.CubeStatsWriter; +import org.apache.kylin.engine.mr.common.SerializableConfiguration; +import org.apache.kylin.measure.BufferedMeasureCodec; +import org.apache.kylin.measure.hllc.HLLCounter; +import org.apache.kylin.shaded.com.google.common.base.Preconditions; +import org.apache.kylin.shaded.com.google.common.collect.Maps; +import org.apache.kylin.shaded.com.google.common.hash.HashFunction; +import org.apache.kylin.shaded.com.google.common.hash.Hasher; +import org.apache.kylin.shaded.com.google.common.hash.Hashing; +import org.apache.spark.SparkConf; +import org.apache.spark.TaskContext; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +import scala.Tuple2; + +public class SparkCalculateStatsFromBaseCuboidJob extends AbstractApplication implements Serializable { + + protected static final Logger logger = LoggerFactory.getLogger(SparkCalculateStatsFromBaseCuboidJob.class); + + protected Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap(); + + public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg() + .isRequired(true).create(BatchConstants.ARG_CUBE_NAME); + public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_ID).hasArg().isRequired(true) + .create(BatchConstants.ARG_SEGMENT_ID); + public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg() + .isRequired(true).create(BatchConstants.ARG_INPUT); + public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg() + .isRequired(true).create(BatchConstants.ARG_OUTPUT); + public static final Option OPTION_META_URL = OptionBuilder.withArgName(BatchConstants.ARG_META_URL).hasArg().isRequired(true) + .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL); + public static final Option OPTION_JOB_MODE = OptionBuilder.withArgName(BatchConstants.ARG_CUBOID_MODE).hasArg().isRequired(true) + .create(BatchConstants.ARG_CUBOID_MODE); + public static final Option OPTION_SAMPLING_PERCENT = OptionBuilder.withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg() + .isRequired(true).create(BatchConstants.ARG_STATS_SAMPLING_PERCENT); + + private Options options; + + private int samplingPercent; + private int rowCount = 0; + private long[] rowHashCodesLong = null; + //about details of the new algorithm, please see KYLIN-2518 + private boolean isUsePutRowKeyToHllNewAlgorithm; + + private HLLCounter[] allCuboidsHLL = null; + private Long[] cuboidIds; + private Integer[][] allCuboidsBitSet = null; + private HashFunction hf = null; + + protected int nRowKey; + protected long baseCuboidId; + + RowKeyDecoder rowKeyDecoder; + + public SparkCalculateStatsFromBaseCuboidJob() { + options = new Options(); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_OUTPUT_PATH); + options.addOption(OPTION_SEGMENT_ID); + options.addOption(OPTION_META_URL); + options.addOption(OPTION_JOB_MODE); + options.addOption(OPTION_SAMPLING_PERCENT); + } + + @Override + protected Options getOptions() { + return options; + } + + @Override + protected void execute(OptionsHelper optionsHelper) throws Exception { + String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME); + String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID); + String input = optionsHelper.getOptionValue(OPTION_INPUT_PATH); + String output = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH); + String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL); + String jobMode = optionsHelper.getOptionValue(OPTION_JOB_MODE); + this.samplingPercent = Integer.parseInt(optionsHelper.getOptionValue(OPTION_SAMPLING_PERCENT)); + + SparkConf sparkConf = SparkUtil.setKryoSerializerInConf(); + sparkConf.setAppName("Kylin_Calculate_Statics_From_BaseCuboid_Data_" + cubeName + "_With_Spark"); + + KylinSparkJobListener jobListener = new KylinSparkJobListener(); + try (JavaSparkContext sc = new JavaSparkContext(sparkConf)) { + sc.sc().addSparkListener(jobListener); + + HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(output)); + + SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration()); + KylinConfig config = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl); + CubeSegment optSegment; + int cubeStatsHLLPrecision; + try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig + .setAndUnsetThreadLocalConfig(config)) { + + CubeManager cubeManager = CubeManager.getInstance(config); + CubeInstance cube = cubeManager.getCube(cubeName); + CubeDesc cubeDesc = cube.getDescriptor(); + optSegment = cube.getSegmentById(segmentId); + + baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId(); + nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length; + + Set<Long> cuboids = cube.getCuboidsByMode(jobMode); + if (cuboids.size() == 0) { + Set<Long> current = cube.getCuboidsByMode(CuboidModeEnum.CURRENT); + current.removeAll(cube.getCuboidsRecommend()); + cuboids = current; + } + cuboidIds = cuboids.toArray(new Long[cuboids.size()]); + allCuboidsBitSet = CuboidUtil.getCuboidBitSet(cuboidIds, nRowKey); + cubeStatsHLLPrecision = config.getCubeStatsHLLPrecision(); + + + allCuboidsHLL = new HLLCounter[cuboidIds.length]; + for (int i = 0; i < cuboidIds.length; i++) { + allCuboidsHLL[i] = new HLLCounter(cubeStatsHLLPrecision); + } + + //for KYLIN-2518 backward compatibility + if (KylinVersion.isBefore200(cubeDesc.getVersion())) { + isUsePutRowKeyToHllNewAlgorithm = false; + hf = Hashing.murmur3_32(); + logger.info("Found KylinVersion : {}. Use old algorithm for cuboid sampling.", cubeDesc.getVersion()); + } else { + isUsePutRowKeyToHllNewAlgorithm = true; + rowHashCodesLong = new long[nRowKey]; + hf = Hashing.murmur3_128(); + logger.info( + "Found KylinVersion : {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518", + cubeDesc.getVersion()); + } + } + + JavaPairRDD<Text, Text> inputRDD = sc.sequenceFile(input, Text.class, Text.class); + + JavaPairRDD<Text, Text> afterMapRDD = inputRDD.mapPartitionsToPair( + new SparkFunction.PairFlatMapFunctionBase<Iterator<Tuple2<Text, Text>>, Text, Text>() { + @Override + protected void doInit() { + KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl); + CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); + CubeSegment cubeSeg = cubeInstance.getSegmentById(segmentId); + rowKeyDecoder = new RowKeyDecoder(cubeSeg); + } + + @Override + protected Iterator<Tuple2<Text, Text>> doCall(Iterator<Tuple2<Text, Text>> iterator) + throws Exception { + while (iterator.hasNext()) { + Text key = iterator.next()._1(); + long cuboidID = rowKeyDecoder.decode(key.getBytes()); + if (cuboidID != baseCuboidId) { + continue; // Skip data from cuboids which are not the base cuboid + } + + List<String> keyValues = rowKeyDecoder.getValues(); + + if (rowCount < samplingPercent) { + Preconditions.checkArgument(nRowKey == keyValues.size()); + + String[] row = keyValues.toArray(new String[keyValues.size()]); + if (isUsePutRowKeyToHllNewAlgorithm) { + putRowKeyToHLLNew(row); + } else { + putRowKeyToHLLOld(row); + } + } + + if (++rowCount == 100) + rowCount = 0; + } + + ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); + HLLCounter hll; + List<Tuple2<Text, Text>> result = new ArrayList<>(); + + for (int i = 0; i < cuboidIds.length; i++) { + hll = allCuboidsHLL[i]; + Text outputKey = new Text(); + Text outputValue = new Text(); + + outputKey.set(Bytes.toBytes(cuboidIds[i])); + logger.info("Cuboid id to be processed1: " + cuboidIds[i]); + hllBuf.clear(); + hll.writeRegisters(hllBuf); + outputValue.set(hllBuf.array(), 0, hllBuf.position()); + logger.info("Cuboid id to be processed1: " + cuboidIds[i] + "value is " + + hllBuf.array().toString()); + result.add(new Tuple2<Text, Text>(outputKey, outputValue)); + logger.info("result size: " + result.size()); + for (Tuple2<Text, Text> t : result) { + logger.info("result key: " + t._1().toString()); + logger.info("result values: " + t._2.toString()); + } + } + return result.iterator(); + } + + + }); + + afterMapRDD.groupByKey().foreach(new SparkFunction.VoidFunctionBase<Tuple2<Text, Iterable<Text>>>() { + @Override + protected void doInit() { + KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl); + KylinConfig.setAndUnsetThreadLocalConfig(kylinConfig); + } + + @Override + protected void doCall(Tuple2<Text, Iterable<Text>> v1) throws Exception { + Text key = v1._1(); + Iterable<Text> values = v1._2(); + + long cuboidId = Bytes.toLong(key.getBytes()); + logger.info("Cuboid id to be processed: " + cuboidId); + + List<Long> baseCuboidRowCountInMappers = Lists.newArrayList(); + long totalRowsBeforeMerge = 0; + + for (Text value : values) { + HLLCounter hll = new HLLCounter(cubeStatsHLLPrecision); + ByteBuffer bf = ByteBuffer.wrap(value.getBytes(), 0, value.getLength()); + hll.readRegisters(bf); + + if (cuboidId == baseCuboidId) { + baseCuboidRowCountInMappers.add(hll.getCountEstimate()); + } + + totalRowsBeforeMerge += hll.getCountEstimate(); + + if (cuboidHLLMap.get(cuboidId) != null) { + cuboidHLLMap.get(cuboidId).merge(hll); + } else { + cuboidHLLMap.put(cuboidId, hll); + } + } + + long grandTotal = 0; + for (HLLCounter hll : cuboidHLLMap.values()) { + grandTotal += hll.getCountEstimate(); + } + double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal; + + logger.info("writer cuboIdstatic to " + output); + CubeStatsWriter.writePartialCuboidStatistics(sConf.get(), new Path(output), cuboidHLLMap, + samplingPercent, baseCuboidRowCountInMappers.size(), mapperOverlapRatio, + TaskContext.getPartitionId()); + } + }); + } + + } + + private void putRowKeyToHLLOld(String[] row) { + //generate hash for each row key column + byte[][] rowHashCodes = new byte[nRowKey][]; + for (int i = 0; i < nRowKey; i++) { + Hasher hc = hf.newHasher(); + String colValue = row[i]; + if (colValue != null) { + rowHashCodes[i] = hc.putUnencodedChars(colValue).hash().asBytes(); + } else { + rowHashCodes[i] = hc.putInt(0).hash().asBytes(); + } + } + + // use the row key column hash to get a consolidated hash for each cuboid + for (int i = 0; i < cuboidIds.length; i++) { + Hasher hc = hf.newHasher(); + for (int position = 0; position < allCuboidsBitSet[i].length; position++) { + hc.putBytes(rowHashCodes[allCuboidsBitSet[i][position]]); + } + + allCuboidsHLL[i].add(hc.hash().asBytes()); + } + } + + private void putRowKeyToHLLNew(String[] row) { + //generate hash for each row key column + for (int i = 0; i < nRowKey; i++) { + Hasher hc = hf.newHasher(); + String colValue = row[i]; + if (colValue == null) + colValue = "0"; + byte[] bytes = hc.putUnencodedChars(colValue).hash().asBytes(); + rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column ordinal to the hash value to distinguish between (a,b) and (b,a) + } + + // user the row key column hash to get a consolidated hash for each cuboid + for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) { + long value = 0; + for (int position = 0; position < allCuboidsBitSet[i].length; position++) { + value += rowHashCodesLong[allCuboidsBitSet[i][position]]; + } + allCuboidsHLL[i].addHashDirectly(value); + } + } + +} diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java index 4c9c391..ab0a565 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java @@ -96,14 +96,14 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg() .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME); - public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true) - .withDescription("Cube Segment Id").create("segmentId"); - public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true) - .withDescription("HDFS metadata url").create("metaUrl"); + public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segmentId").hasArg().isRequired(true) + .withDescription("Cube Segment Id").create(BatchConstants.ARG_SEGMENT_ID); + public static final Option OPTION_META_URL = OptionBuilder.withArgName(BatchConstants.ARG_META_URL).hasArg().isRequired(true) + .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL); public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg() .isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT); public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true) - .withDescription("Hive Intermediate Table").create("hiveTable"); + .withDescription("Hive Intermediate Table").create(BatchConstants.ARG_HIVE_TABLE); public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg() .isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT); public static final Option OPTION_SHRUNK_INPUT_PATH = OptionBuilder diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerForOpt.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerForOpt.java new file mode 100644 index 0000000..8857cc8 --- /dev/null +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerForOpt.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark; + +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.AbstractApplication; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.OptionsHelper; +import org.apache.kylin.cube.CubeDescManager; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.common.RowKeySplitter; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.cuboid.CuboidScheduler; +import org.apache.kylin.cube.cuboid.CuboidUtil; +import org.apache.kylin.cube.kv.RowKeyEncoderProvider; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.engine.mr.BatchCubingJobBuilder2; +import org.apache.kylin.engine.mr.IMROutput2; +import org.apache.kylin.engine.mr.MRUtil; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.CubeStatsReader; +import org.apache.kylin.engine.mr.common.CuboidSchedulerUtil; +import org.apache.kylin.engine.mr.common.NDCuboidBuilder; +import org.apache.kylin.engine.mr.common.SerializableConfiguration; +import org.apache.kylin.measure.BufferedMeasureCodec; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.storage.StorageLevel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +public class SparkCubingByLayerForOpt extends AbstractApplication implements Serializable { + + protected static final Logger logger = LoggerFactory.getLogger(SparkCubingByLayerForOpt.class); + + public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg() + .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME); + public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true) + .withDescription("Cube Segment Id").create(BatchConstants.ARG_SEGMENT_ID); + public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true) + .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL); + public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg() + .isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT); + public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg() + .isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT); + public static final Option OPTION_CUBOID_MODE = OptionBuilder.withArgName(BatchConstants.ARG_CUBOID_MODE).hasArg() + .isRequired(true).withDescription("CoboId Mode ").create(BatchConstants.ARG_CUBOID_MODE); + + private Options options; + + public SparkCubingByLayerForOpt() { + options = new Options(); + options.addOption(OPTION_CUBOID_MODE); + options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_SEGMENT_ID); + options.addOption(OPTION_META_URL); + options.addOption(OPTION_OUTPUT_PATH); + } + + @Override + protected Options getOptions() { + return options; + } + + @Override + protected void execute(OptionsHelper optionsHelper) throws Exception { + String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL); + String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH); + String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME); + String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID); + String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH); + String cuboidMode = optionsHelper.getOptionValue(OPTION_CUBOID_MODE); + + SparkConf sparkConf = SparkUtil.setKryoSerializerInConf(); + sparkConf.setAppName("Kylin_Cubing_For_Optimize_" + cubeName + "_With_Spark"); + + KylinSparkJobListener jobListener = new KylinSparkJobListener(); + try (JavaSparkContext sc = new JavaSparkContext(sparkConf)) { + SparkUtil.modifySparkHadoopConfiguration(sc.sc(), AbstractHadoopJob + .loadKylinConfigFromHdfs(new SerializableConfiguration(sc.hadoopConfiguration()), metaUrl)); // set dfs.replication and enable compress + sc.sc().addSparkListener(jobListener); + + final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration()); + final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl); + + final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName); + final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName()); + final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); + final CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, envConfig); + + final Job job = Job.getInstance(sConf.get()); + SparkUtil.setHadoopConfForCuboid(job, cubeSegment, metaUrl); + + StorageLevel storageLevel = StorageLevel.fromString(envConfig.getSparkStorageLevel()); + JavaPairRDD<ByteArray, Object[]> baseCuboIdRDD = SparkUtil.getCuboIdRDDFromHdfs(sc, metaUrl, cubeName, + cubeSegment, inputPath, cubeDesc.getMeasures().size(), sConf); + + // Don't know statistics so that tree cuboid scheduler is not determined. Determine the maxLevel at runtime + final Set<Long> cuboidsByMode = cubeSegment.getCubeInstance().getCuboidsByMode(cuboidMode); + final int maxLevel = CuboidUtil.getLongestDepth(cuboidsByMode); + + logger.info("cuboidMode" + cuboidMode); + logger.info("maxLevel" + maxLevel); + + CuboidScheduler scheduler = CuboidSchedulerUtil.getCuboidSchedulerByMode(cubeSegment, cuboidMode); + + JavaPairRDD<ByteArray, Object[]>[] allRDDs = new JavaPairRDD[maxLevel + 1]; + allRDDs[0] = baseCuboIdRDD; + + SparkCubingByLayer.BaseCuboidReducerFunction2 reducerFunction2 = new SparkCubingByLayer.BaseCuboidReducerFunction2( + cubeName, metaUrl, sConf); + + boolean allNormalMeasure = true; + boolean[] needAggr = new boolean[cubeDesc.getMeasures().size()]; + for (int i = 0; i < cubeDesc.getMeasures().size(); i++) { + needAggr[i] = !cubeDesc.getMeasures().get(i).getFunction().getMeasureType().onlyAggrInBaseCuboid(); + allNormalMeasure = allNormalMeasure && needAggr[i]; + } + + if (!allNormalMeasure) { + reducerFunction2 = new SparkCubingByLayer.CuboidReducerFunction2(cubeName, metaUrl, sConf, needAggr); + } + + for (int i = 1; i <= maxLevel; i++) { + int partition = SparkUtil.estimateLayerPartitionNum(i, cubeStatsReader, envConfig); + allRDDs[i] = allRDDs[i - 1] + .flatMapToPair(new CuboidFlatMap(cubeName, segmentId, metaUrl, sConf, scheduler)) + .reduceByKey(reducerFunction2, partition); + allRDDs[i].persist(storageLevel); + saveToHDFS(allRDDs[i], metaUrl, cubeName, cubeSegment, outputPath, i, job); + // must do unpersist after allRDDs[level] is created, otherwise this RDD will be recomputed + allRDDs[i - 1].unpersist(false); + } + allRDDs[maxLevel].unpersist(false); + logger.info("Finished on calculating needed cuboids For Optimize."); + logger.info("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten()); + + } + + } + + static public class CuboidFlatMap + extends SparkFunction.PairFlatMapFunctionBase<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> { + + private String cubeName; + private String segmentId; + private String metaUrl; + private CubeSegment cubeSegment; + private CubeDesc cubeDesc; + private NDCuboidBuilder ndCuboidBuilder; + private RowKeySplitter rowKeySplitter; + private SerializableConfiguration conf; + private CuboidScheduler cuboidScheduler; + + public CuboidFlatMap(String cubeName, String segmentId, String metaUrl, SerializableConfiguration conf, + CuboidScheduler scheduler) { + this.cubeName = cubeName; + this.segmentId = segmentId; + this.metaUrl = metaUrl; + this.conf = conf; + this.cuboidScheduler = scheduler; + } + + @Override + protected void doInit() { + KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl); + try (KylinConfig.SetAndUnsetThreadLocalConfig autoClose = KylinConfig.setAndUnsetThreadLocalConfig(kConfig)) { + CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName); + this.cubeSegment = cubeInstance.getSegmentById(segmentId); + this.cubeDesc = cubeInstance.getDescriptor(); + this.rowKeySplitter = new RowKeySplitter(cubeSegment); + this.ndCuboidBuilder = new NDCuboidBuilder(cubeSegment, new RowKeyEncoderProvider(cubeSegment)); + } + } + + @Override + public Iterator<Tuple2<ByteArray, Object[]>> doCall(final Tuple2<ByteArray, Object[]> tuple2) throws Exception { + byte[] key = tuple2._1().array(); + long cuboidId = rowKeySplitter.parseCuboid(key); + final List<Long> myChildren = cuboidScheduler.getSpanningCuboid(cuboidId); + + // if still empty or null + if (myChildren == null || myChildren.size() == 0) { + return EMTPY_ITERATOR.iterator(); + } + rowKeySplitter.split(key); + final Cuboid parentCuboid = Cuboid.findForMandatory(cubeDesc, cuboidId); + + List<Tuple2<ByteArray, Object[]>> tuples = new ArrayList(myChildren.size()); + for (Long child : myChildren) { + Cuboid childCuboid = Cuboid.findForMandatory(cubeDesc, child); + ByteArray result = ndCuboidBuilder.buildKey2(parentCuboid, childCuboid, + rowKeySplitter.getSplitBuffers()); + + tuples.add(new Tuple2<>(result, tuple2._2())); + } + + return tuples.iterator(); + } + } + + private static final java.lang.Iterable<Tuple2<ByteArray, Object[]>> EMTPY_ITERATOR = new ArrayList(0); + + protected void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final String metaUrl, final String cubeName, + final CubeSegment cubeSeg, final String hdfsBaseLocation, final int level, final Job job) throws Exception { + final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level); + final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration()); + + IMROutput2.IMROutputFormat outputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getOutputFormat(); + outputFormat.configureJobOutput(job, cuboidOutputPath, cubeSeg, cubeSeg.getCuboidScheduler(), level); + + rdd.mapToPair(new SparkFunction.PairFunctionBase<Tuple2<ByteArray, Object[]>, Text, Text>() { + private BufferedMeasureCodec codec; + + @Override + protected void doInit() { + KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl); + try (KylinConfig.SetAndUnsetThreadLocalConfig autoClose = KylinConfig.setAndUnsetThreadLocalConfig(kylinConfig)) { + CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName); + codec = new BufferedMeasureCodec(desc.getMeasures()); + } + } + + @Override + public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> doCall( + Tuple2<ByteArray, Object[]> tuple2) throws Exception { + ByteBuffer valueBuf = codec.encode(tuple2._2()); + org.apache.hadoop.io.Text textResult = new org.apache.hadoop.io.Text(); + textResult.set(valueBuf.array(), 0, valueBuf.position()); + return new Tuple2<>(new org.apache.hadoop.io.Text(tuple2._1().array()), textResult); + } + }).saveAsNewAPIHadoopDataset(job.getConfiguration()); + logger.info("Persisting RDD for level " + level + " into " + cuboidOutputPath); + } +} diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java index 94f3a4e..71cd5aa 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java @@ -68,9 +68,9 @@ public class SparkCubingMerge extends AbstractApplication implements Serializabl public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg() .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME); public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true) - .withDescription("Cube Segment Id").create("segmentId"); + .withDescription("Cube Segment Id").create(BatchConstants.ARG_SEGMENT_ID); public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true) - .withDescription("HDFS metadata url").create("metaUrl"); + .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL); public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg() .isRequired(true).withDescription("HFile output path").create(BatchConstants.ARG_OUTPUT); public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg() diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java index 66ae57e..fca1839 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java @@ -210,7 +210,7 @@ public class SparkExecutable extends AbstractExecutable { if (!StringUtils.isEmpty(sparkJobId)) { return onResumed(sparkJobId, mgr); } else { - String cubeName = this.getParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt()); + String cubeName = this.getParam(BatchConstants.ARG_CUBE_NAME); CubeInstance cube; if (cubeName != null) { cube = CubeManager.getInstance(context.getConfig()).getCube(cubeName); @@ -222,7 +222,7 @@ public class SparkExecutable extends AbstractExecutable { config = cube.getConfig(); } else { // when loading hive table, we can't get cube name/config, so we get config from project. - String projectName = this.getParam(SparkColumnCardinality.OPTION_PRJ.getOpt()); + String projectName = this.getParam(BatchConstants.ARG_PROJECT); ProjectInstance projectInst = ProjectManager.getInstance(context.getConfig()).getProject(projectName); config = projectInst.getConfig(); } @@ -253,7 +253,7 @@ public class SparkExecutable extends AbstractExecutable { if (cube != null && !isCreateFlatTable()) { setAlgorithmLayer(); - String segmentID = this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt()); + String segmentID = this.getParam(BatchConstants.ARG_SEGMENT_ID); CubeSegment segment = cube.getSegmentById(segmentID); Segments<CubeSegment> mergingSeg = cube.getMergingSegments(segment); dumpMetadata(segment, mergingSeg); @@ -522,7 +522,7 @@ public class SparkExecutable extends AbstractExecutable { CubeDescTiretreeGlobalDomainDictUtil.cuboidJob(segment.getCubeDesc(), dumpList); JobRelatedMetaUtil.dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segment.getConfig(), - this.getParam(SparkCubingByLayer.OPTION_META_URL.getOpt())); + this.getParam(BatchConstants.ARG_META_URL)); } private void attachSegmentsMetadataWithDict(List<CubeSegment> segments) throws IOException { @@ -539,7 +539,7 @@ public class SparkExecutable extends AbstractExecutable { CubeDescTiretreeGlobalDomainDictUtil.cuboidJob(segment.getCubeDesc(), dumpList); } JobRelatedMetaUtil.dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segments.get(0).getConfig(), - this.getParam(SparkCubingByLayer.OPTION_META_URL.getOpt())); + this.getParam(BatchConstants.ARG_META_URL)); } protected void readCounters(final Map<String, String> info) { diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java index a116cc8..d2020f1 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java @@ -110,11 +110,11 @@ public class SparkFactDistinct extends AbstractApplication implements Serializab public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg() .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME); public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true) - .withDescription("HDFS metadata url").create("metaUrl"); + .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL); public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg() .isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT); public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segmentId").hasArg().isRequired(true) - .withDescription("Cube Segment Id").create("segmentId"); + .withDescription("Cube Segment Id").create(BatchConstants.ARG_SEGMENT_ID); public static final Option OPTION_STATS_SAMPLING_PERCENT = OptionBuilder .withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg().isRequired(true) .withDescription("Statistics sampling percent").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT); diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFilterRecommendCuboidDataJob.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFilterRecommendCuboidDataJob.java new file mode 100644 index 0000000..704dc8d --- /dev/null +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFilterRecommendCuboidDataJob.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark; + +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.AbstractApplication; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.OptionsHelper; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.common.RowKeySplitter; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.MapReduceUtil; +import org.apache.kylin.engine.mr.common.SerializableConfiguration; +import org.apache.kylin.shaded.com.google.common.base.Preconditions; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.io.IOException; +import java.io.Serializable; + +import java.util.Set; + +import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidBase; +import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidOld; +import static org.apache.kylin.engine.spark.SparkUtil.configConvergeCuboidDataReduceOut; +import static org.apache.kylin.engine.spark.SparkUtil.generateFilePath; + +public class SparkFilterRecommendCuboidDataJob extends AbstractApplication implements Serializable { + + protected static final Logger logger = LoggerFactory.getLogger(SparkFilterRecommendCuboidDataJob.class); + + public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg() + .isRequired(true).create(BatchConstants.ARG_CUBE_NAME); + public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_ID).hasArg().isRequired(true) + .create(BatchConstants.ARG_SEGMENT_ID); + public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg() + .isRequired(true).create(BatchConstants.ARG_INPUT); + public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg() + .isRequired(true).create(BatchConstants.ARG_OUTPUT); + public static final Option OPTION_META_URL = OptionBuilder.withArgName(BatchConstants.ARG_META_URL).hasArg().isRequired(true) + .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL); + + private Options options; + + public SparkFilterRecommendCuboidDataJob() { + options = new Options(); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_OUTPUT_PATH); + options.addOption(OPTION_SEGMENT_ID); + options.addOption(OPTION_META_URL); + } + + @Override + protected Options getOptions() { + return options; + } + + @Override + protected void execute(OptionsHelper optionsHelper) throws Exception { + String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME); + String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID); + String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH); + String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH); + String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL); + + boolean enableSharding; + long baseCuboid; + Set<Long> recommendCuboids; + + SparkConf sparkConf = SparkUtil.setKryoSerializerInConf(); + sparkConf.setAppName("Kylin_Filter_Recommend_Cuboid_Data_" + cubeName + "_With_Spark"); + + KylinSparkJobListener jobListener = new KylinSparkJobListener(); + try (JavaSparkContext sc = new JavaSparkContext(sparkConf)) { + sc.sc().addSparkListener(jobListener); + + final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration()); + KylinConfig config = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl); + + HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), new Path(outputPath)); + + CubeManager cubeManager = CubeManager.getInstance(config); + CubeInstance cube = cubeManager.getCube(cubeName); + CubeSegment optSegment = cube.getSegmentById(segmentId); + CubeSegment originalSegment = cube.getOriginalSegmentToOptimize(optSegment); + + enableSharding = originalSegment.isEnableSharding(); + baseCuboid = cube.getCuboidScheduler().getBaseCuboidId(); + + recommendCuboids = cube.getCuboidsRecommend(); + Preconditions.checkNotNull(recommendCuboids, "The recommend cuboid map could not be null"); + + FileSystem hdfs = FileSystem.get(sc.hadoopConfiguration()); + if (!hdfs.exists(new Path(inputPath.substring(0, inputPath.length() - 1)))) { + throw new IOException("OldCuboIdFilePath " + inputPath + " does not exists"); + } + + // inputPath is oldcuboidRootPath + JavaPairRDD<Text, Text> inputRDD = sc.sequenceFile(inputPath, Text.class, Text.class); + + logger.info("start to calculate nBaseReduceTasks"); + Pair<Integer, Integer> taskNums = MapReduceUtil.getConvergeCuboidDataReduceTaskNums(originalSegment); + int reduceTasks = taskNums.getFirst(); + int nBaseReduceTasks = taskNums.getSecond(); + logger.info("nBaseReduceTasks is {}", nBaseReduceTasks); + + final Job job = Job.getInstance(sConf.get()); + SparkUtil.setHadoopConfForCuboid(job, originalSegment, metaUrl); + + JavaPairRDD<Text, Text> baseCuboIdRDD = inputRDD.filter(new Function<Tuple2<Text, Text>, Boolean>() { + @Override + public Boolean call(Tuple2<Text, Text> v1) throws Exception { + long cuboidId = RowKeySplitter.getCuboidId(v1._1.getBytes(), enableSharding); + return cuboidId == baseCuboid; + } + }); + + configConvergeCuboidDataReduceOut(job, generateFilePath(PathNameCuboidBase, outputPath)); + baseCuboIdRDD.coalesce(nBaseReduceTasks).saveAsNewAPIHadoopDataset(job.getConfiguration()); + + JavaPairRDD<Text, Text> reuseCuboIdRDD = inputRDD.filter(new Function<Tuple2<Text, Text>, Boolean>() { + @Override + public Boolean call(Tuple2<Text, Text> v1) throws Exception { + long cuboidId = RowKeySplitter.getCuboidId(v1._1.getBytes(), enableSharding); + return recommendCuboids.contains(cuboidId); + } + }); + + configConvergeCuboidDataReduceOut(job, generateFilePath(PathNameCuboidOld, outputPath)); + reuseCuboIdRDD.coalesce(reduceTasks).saveAsNewAPIHadoopDataset(job.getConfiguration()); + + } + } +} diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFunction.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFunction.java new file mode 100644 index 0000000..39917a3 --- /dev/null +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFunction.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark; + +import org.apache.kylin.common.util.MemoryBudgetController; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.api.java.function.VoidFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.io.Serializable; +import java.util.Iterator; + +public class SparkFunction { + + protected static final Logger logger = LoggerFactory.getLogger(SparkFunction.class); + + private static abstract class FunctionBase implements Serializable { + private volatile transient boolean initialized = false; + private transient int recordCounter; + + protected abstract void doInit(); + + protected void init() { + if (!initialized) { + synchronized (SparkFunction.class) { + if (!initialized) { + logger.info("Start to do init for {}", this); + doInit(); + initialized = true; + recordCounter = 0; + } + } + } + if (recordCounter++ % SparkUtil.getNormalRecordLogThreshold() == 0) { + logger.info("Accepting record with ordinal: " + recordCounter); + logger.info("Do call, available memory: {}m", MemoryBudgetController.getSystemAvailMB()); + } + } + } + + public static abstract class PairFunctionBase<T, K, V> extends FunctionBase implements PairFunction<T, K, V> { + + protected abstract Tuple2<K, V> doCall(T t) throws Exception; + + @Override + public Tuple2<K, V> call(T t) throws Exception { + init(); + return doCall(t); + } + } + + public static abstract class Function2Base<T1, T2, R> extends FunctionBase implements Function2<T1, T2, R> { + + protected abstract R doCall(T1 v1, T2 v2) throws Exception; + + @Override + public R call(T1 v1, T2 v2) throws Exception { + init(); + return doCall(v1, v2); + } + } + + public static abstract class PairFlatMapFunctionBase<T, K, V> extends FunctionBase implements PairFlatMapFunction<T, K, V> { + + protected abstract Iterator<Tuple2<K, V>> doCall(T t) throws Exception; + + @Override + public Iterator<Tuple2<K, V>> call(T t) throws Exception { + init(); + return doCall(t); + } + } + + public static abstract class VoidFunctionBase<T> extends FunctionBase implements VoidFunction<T> { + + protected abstract void doCall(T t) throws Exception; + + @Override + public void call(T t) throws Exception { + init(); + doCall(t); + } + } +} diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java index ddd0755..4a4bbb0 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java @@ -82,9 +82,9 @@ public class SparkMergingDictionary extends AbstractApplication implements Seria public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg() .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME); public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segmentId").hasArg().isRequired(true) - .withDescription("Cube Segment Id").create("segmentId"); + .withDescription("Cube Segment Id").create(BatchConstants.ARG_SEGMENT_ID); public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true) - .withDescription("HDFS metadata url").create("metaUrl"); + .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL); public static final Option OPTION_MERGE_SEGMENT_IDS = OptionBuilder.withArgName("segmentIds").hasArg() .isRequired(true).withDescription("Merging Cube Segment Ids").create("segmentIds"); public static final Option OPTION_OUTPUT_PATH_DICT = OptionBuilder.withArgName("dictOutputPath").hasArg() diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUHCDictionary.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUHCDictionary.java index 0662abb..f1c4419 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUHCDictionary.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUHCDictionary.java @@ -76,11 +76,11 @@ public class SparkUHCDictionary extends AbstractApplication implements Serializa public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg() .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME); public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true) - .withDescription("HDFS metadata url").create("metaUrl"); + .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL); public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg() .isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT); public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segmentId").hasArg().isRequired(true) - .withDescription("Cube Segment Id").create("segmentId"); + .withDescription("Cube Segment Id").create(BatchConstants.ARG_SEGMENT_ID); public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg() .isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT); public static final Option OPTION_CUBING_JOB_ID = OptionBuilder diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUpdateShardForOldCuboidDataStep.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUpdateShardForOldCuboidDataStep.java new file mode 100644 index 0000000..f153459 --- /dev/null +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUpdateShardForOldCuboidDataStep.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark; + +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.AbstractApplication; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.OptionsHelper; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.common.RowKeySplitter; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.kv.RowConstants; +import org.apache.kylin.cube.kv.RowKeyEncoder; +import org.apache.kylin.cube.kv.RowKeyEncoderProvider; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.MapReduceUtil; +import org.apache.kylin.engine.mr.common.SerializableConfiguration; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.io.Serializable; + +import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidBase; +import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidOld; +import static org.apache.kylin.engine.spark.SparkUtil.configConvergeCuboidDataReduceOut; +import static org.apache.kylin.engine.spark.SparkUtil.generateFilePath; + +public class SparkUpdateShardForOldCuboidDataStep extends AbstractApplication implements Serializable { + + protected static final Logger logger = LoggerFactory.getLogger(SparkUpdateShardForOldCuboidDataStep.class); + + public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg() + .isRequired(true).create(BatchConstants.ARG_CUBE_NAME); + public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_ID).hasArg().isRequired(true) + .create(BatchConstants.ARG_SEGMENT_ID); + public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg() + .isRequired(true).create(BatchConstants.ARG_INPUT); + public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg() + .isRequired(true).create(BatchConstants.ARG_OUTPUT); + public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true) + .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL); + + private Options options; + private CubeDesc cubeDesc; + private RowKeySplitter rowKeySplitter; + private RowKeyEncoderProvider rowKeyEncoderProvider; + + private byte[] newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE]; + private ByteArray newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE); + + public SparkUpdateShardForOldCuboidDataStep() { + options = new Options(); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_SEGMENT_ID); + options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_OUTPUT_PATH); + options.addOption(OPTION_META_URL); + } + + @Override + protected Options getOptions() { + return options; + } + + @Override + protected void execute(OptionsHelper optionsHelper) throws Exception { + // input is optimizeCuboidRootPath + "*", output is cuboidRootPath. + String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME); + String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID); + String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH); + String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH); + String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL); + + String baseCuboIdInputPath = inputPath + PathNameCuboidBase; + String oldCuboIdInputPath = inputPath + PathNameCuboidOld; + + SparkConf sparkConf = SparkUtil.setKryoSerializerInConf(); + sparkConf.setAppName("Update_Old_Cuboid_Shard_for_Optimization" + cubeName + "_With_Spark"); + + KylinSparkJobListener jobListener = new KylinSparkJobListener(); + try (JavaSparkContext sc = new JavaSparkContext(sparkConf)) { + sc.sc().addSparkListener(jobListener); + + final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration()); + KylinConfig config = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl); + + HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), new Path(outputPath)); + + CubeManager cubeManager = CubeManager.getInstance(config); + CubeInstance cube = cubeManager.getCube(cubeName); + CubeSegment optSegment = cube.getSegmentById(segmentId); + CubeSegment originalSegment = cube.getOriginalSegmentToOptimize(optSegment); + // + optSegment.setCubeInstance(originalSegment.getCubeInstance()); + + JavaPairRDD<Text, Text> baseCuboIdRDD = sc.sequenceFile(baseCuboIdInputPath, Text.class, Text.class); + JavaPairRDD<Text, Text> oldCuboIdRDD = sc.sequenceFile(oldCuboIdInputPath, Text.class, Text.class); + + cubeDesc = cube.getDescriptor(); + + logger.info("start to calculate nBaseReduceTasks"); + Pair<Integer, Integer> taskNums = MapReduceUtil.getConvergeCuboidDataReduceTaskNums(originalSegment); + int reduceTasks = taskNums.getFirst(); + int nBaseReduceTasks = taskNums.getSecond(); + logger.info("nBaseReduceTasks is {}", nBaseReduceTasks); + + final Job job = Job.getInstance(sConf.get()); + SparkUtil.setHadoopConfForCuboid(job, originalSegment, metaUrl); + + //UpdateCuboidShard for baseCuboId + JavaPairRDD<Text, Text> mapBaseCuboIdRDD = baseCuboIdRDD.mapToPair(new SparkFunction.PairFunctionBase<Tuple2<Text, Text>, Text, Text>() { + @Override + protected void doInit() { + initMethod(sConf, metaUrl, cubeName, optSegment, originalSegment); + } + + @Override + protected Tuple2<Text, Text> doCall(Tuple2<Text, Text> tuple2) throws Exception { + Text outputKey = new Text(); + long cuboidID = rowKeySplitter.split(tuple2._1.getBytes()); + + Cuboid cuboid = new Cuboid(cubeDesc, cuboidID, cuboidID); + int fullKeySize = buildKey(cuboid, rowKeySplitter.getSplitBuffers()); + outputKey.set(newKeyBuf.array(), 0, fullKeySize); + return new Tuple2<Text, Text>(outputKey, tuple2._2); + } + }); + + configConvergeCuboidDataReduceOut(job, generateFilePath(PathNameCuboidBase, outputPath)); + mapBaseCuboIdRDD.repartition(nBaseReduceTasks).saveAsNewAPIHadoopDataset(job.getConfiguration()); + + //UpdateCuboidShard for oldCuboIds + JavaPairRDD<Text, Text> mapOldCuboIdRDD = oldCuboIdRDD.mapToPair(new SparkFunction.PairFunctionBase<Tuple2<Text, Text>, Text, Text>() { + @Override + protected void doInit() { + initMethod(sConf, metaUrl, cubeName, optSegment, originalSegment); + } + + @Override + protected Tuple2<Text, Text> doCall(Tuple2<Text, Text> tuple2) throws Exception { + Text outputKey = new Text(); + long cuboidID = rowKeySplitter.split(tuple2._1.getBytes()); + + Cuboid cuboid = new Cuboid(cubeDesc, cuboidID, cuboidID); + int fullKeySize = buildKey(cuboid, rowKeySplitter.getSplitBuffers()); + outputKey.set(newKeyBuf.array(), 0, fullKeySize); + return new Tuple2<Text, Text>(outputKey, tuple2._2); + } + }); + + configConvergeCuboidDataReduceOut(job, generateFilePath(PathNameCuboidOld, outputPath)); + mapOldCuboIdRDD.repartition(reduceTasks).saveAsNewAPIHadoopDataset(job.getConfiguration()); + + //SparkUtil.convergeCuboidDataReduce(mapRDD, cube, originalSegment, outputPath, metaUrl, config, sConf); + + } + } + + private int buildKey(Cuboid cuboid, ByteArray[] splitBuffers) { + RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid); + + int startIdx = rowKeySplitter.getBodySplitOffset(); // skip shard and cuboidId + int endIdx = startIdx + Long.bitCount(cuboid.getId()); + int offset = 0; + for (int i = startIdx; i < endIdx; i++) { + System.arraycopy(splitBuffers[i].array(), splitBuffers[i].offset(), newKeyBodyBuf, offset, + splitBuffers[i].length()); + offset += splitBuffers[i].length(); + } + + int fullKeySize = rowkeyEncoder.getBytesLength(); + while (newKeyBuf.array().length < fullKeySize) { + newKeyBuf = new ByteArray(newKeyBuf.length() * 2); + } + newKeyBuf.setLength(fullKeySize); + + rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, offset), newKeyBuf); + + return fullKeySize; + } + + private void initMethod(SerializableConfiguration sConf, String metaUrl, String cubeName, CubeSegment optSegment, CubeSegment originalSegment) { + KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl); + KylinConfig.setKylinConfigInEnvIfMissing(kylinConfig.exportToProperties()); + CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); + CubeSegment originalSeg = cubeInstance.getSegmentById(originalSegment.getUuid()); + CubeSegment optSeg = cubeInstance.getSegmentById(optSegment.getUuid()); + rowKeySplitter = new RowKeySplitter(originalSeg); + rowKeyEncoderProvider = new RowKeyEncoderProvider(optSeg); + } +} diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java index d146c85..66ff734 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java @@ -20,7 +20,9 @@ package org.apache.kylin.engine.spark; import java.io.DataInputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.List; +import java.util.Locale; import java.util.Map; import org.apache.hadoop.conf.Configuration; @@ -30,22 +32,31 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.StringUtil; +import org.apache.kylin.cube.CubeDescManager; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.dict.ShrunkenDictionary; import org.apache.kylin.engine.EngineFactory; -import org.apache.kylin.engine.mr.IMROutput2; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.CubeStatsReader; +import org.apache.kylin.engine.mr.common.SerializableConfiguration; +import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.SourceManager; import org.apache.kylin.storage.StorageFactory; +import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -61,11 +72,16 @@ import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.hive.HiveUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.Tuple2; public class SparkUtil { private static final Logger logger = LoggerFactory.getLogger(SparkUtil.class); + public static int getNormalRecordLogThreshold() { + return 1000; + } + public static ISparkBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) { IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(seg); return (ISparkBatchCubingInputSide)SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchCubingInputSide(flatDesc); @@ -83,8 +99,8 @@ public class SparkUtil { return (ISparkBatchMergeInputSide)SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchMergeInputSide(seg); } - public static IMROutput2.IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide2(CubeSegment seg) { - return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchOptimizeOutputSide(seg); + public static ISparkOutput.ISparkBatchOptimizeOutputSide getBatchOptimizeOutputSide2(CubeSegment seg) { + return StorageFactory.createEngineAdapter(seg, ISparkOutput.class).getBatchOptimizeOutputSide(seg); } /** @@ -228,4 +244,81 @@ public class SparkUtil { return dictionaryMap; } + + public static SparkConf setKryoSerializerInConf() throws ClassNotFoundException { + Class[] kryoClassArray = new Class<?>[] { Class.forName("scala.reflect.ClassTag$$anon$1"), + Class.forName("org.apache.kylin.engine.mr.steps.SelfDefineSortableKey"), + Class.forName("scala.collection.mutable.WrappedArray$ofRef"), + Class.forName("org.apache.hadoop.io.Text") }; + + SparkConf sparkConf = new SparkConf(); + + //serialization conf + sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + sparkConf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator"); + sparkConf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray); + + return sparkConf; + } + + public static String generateFilePath(String subOutputPath, String jobOutPath) { + logger.info( + "ConvergeCuboidDataReduce Out Path" + String.format(Locale.ROOT, "%s%s", jobOutPath, subOutputPath)); + return String.format(Locale.ROOT, "%s%s", jobOutPath, subOutputPath); + } + + public static JavaPairRDD<ByteArray, Object[]> getCuboIdRDDFromHdfs(JavaSparkContext sc, final String metaUrl, + final String cubeName, final CubeSegment seg, final String inputPath, final int measureNum, + SerializableConfiguration sConf) { + + JavaPairRDD<Text, Text> rdd = sc.sequenceFile(inputPath, Text.class, Text.class); + // re-encode + return rdd.mapToPair(new ReEncodeCuboidFunction(cubeName, seg.getUuid(), metaUrl, sConf, measureNum)); + } + + static class ReEncodeCuboidFunction + extends SparkFunction.PairFunctionBase<Tuple2<Text, Text>, ByteArray, Object[]> { + private String cubeName; + private String sourceSegmentId; + private String metaUrl; + private SerializableConfiguration conf; + private int measureNum; + private transient KylinConfig kylinConfig; + private transient BufferedMeasureCodec segmentReEncoder = null; + + ReEncodeCuboidFunction(String cubeName, String sourceSegmentId, String metaUrl, SerializableConfiguration conf, + int measureNum) { + this.cubeName = cubeName; + this.sourceSegmentId = sourceSegmentId; + this.metaUrl = metaUrl; + this.conf = conf; + this.measureNum = measureNum; + } + + @Override + protected void doInit() { + this.kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl); + try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig + .setAndUnsetThreadLocalConfig(kylinConfig)) { + final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(cubeName); + final CubeDesc cubeDesc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cube.getDescName()); + final CubeSegment sourceSeg = cube.getSegmentById(sourceSegmentId); + this.segmentReEncoder = new BufferedMeasureCodec(cubeDesc.getMeasures()); + } + } + + @Override + public Tuple2<ByteArray, Object[]> doCall(Tuple2<Text, Text> textTextTuple2) throws Exception { + Object[] result = new Object[measureNum]; + segmentReEncoder.decode(ByteBuffer.wrap(textTextTuple2._2().getBytes(), 0, textTextTuple2._2().getLength()), + result); + return new Tuple2<ByteArray, Object[]>(new ByteArray(textTextTuple2._1().getBytes()), result); + } + } + + public static void configConvergeCuboidDataReduceOut(Job job, String output) throws IOException { + Path outputPath = new Path(output); + FileOutputFormat.setOutputPath(job, outputPath); + HadoopUtil.deletePath(job.getConfiguration(), outputPath); + } } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java index 03ae3f2..1ea5f2c 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java @@ -98,7 +98,7 @@ public class BulkLoadJob extends AbstractHadoopJob { int ret = 0; if (count > 0) { - logger.debug("Start to run LoadIncrementalHFiles"); + logger.debug("Start to run LoadIncrementalHFiles, File count is: " + count); ret = MRUtil.runMRJob(new LoadIncrementalHFiles(conf), newArgs); logger.debug("End to run LoadIncrementalHFiles"); return ret; diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java index cfeec57..e9af5e0 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java @@ -102,6 +102,7 @@ public class CreateHTableJob extends AbstractHadoopJob { // for cube planner, will keep cuboidSizeMap unchanged if cube planner is disabled Set<Long> buildingCuboids = cube.getCuboidsByMode(cuboidModeName); if (buildingCuboids != null && !buildingCuboids.isEmpty()) { + logger.info("CreateHTableJob buildingCuboids size: " + buildingCuboids.size()); Map<Long, Double> optimizedCuboidSizeMap = Maps.newHashMapWithExpectedSize(buildingCuboids.size()); for (Long cuboid : buildingCuboids) { Double cuboidSize = cuboidSizeMap.get(cuboid); @@ -109,6 +110,7 @@ public class CreateHTableJob extends AbstractHadoopJob { logger.warn("{} cuboid's size is null will replace by 0", cuboid); cuboidSize = 0.0; } + logger.info("Cuboid:" + cuboid + " size: " + cuboidSize); optimizedCuboidSizeMap.put(cuboid, cuboidSize); } cuboidSizeMap = optimizedCuboidSizeMap; diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java index 3702a92..b0b47dd 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java @@ -21,6 +21,7 @@ package org.apache.kylin.storage.hbase.steps; import java.util.List; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.CuboidModeEnum; import org.apache.kylin.engine.spark.ISparkOutput; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.slf4j.Logger; @@ -98,6 +99,29 @@ public class HBaseSparkOutputTransition implements ISparkOutput { } public ISparkBatchOptimizeOutputSide getBatchOptimizeOutputSide(final CubeSegment seg) { - return null; + return new ISparkBatchOptimizeOutputSide() { + HBaseSparkSteps steps = new HBaseSparkSteps(seg); + + @Override + public void addStepPhase2_CreateHTable(DefaultChainedExecutable jobFlow) { + jobFlow.addTask(steps.createCreateHTableStep(jobFlow.getId(), CuboidModeEnum.RECOMMEND)); + } + + @Override + public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) { + jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId())); + jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId())); + } + + @Override + public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) { + steps.addOptimizeGarbageCollectionSteps(jobFlow); + } + + @Override + public void addStepPhase5_Cleanup(DefaultChainedExecutable jobFlow) { + steps.addCheckpointGarbageCollectionSteps(jobFlow); + } + }; } } \ No newline at end of file diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java index d458b8c..807d23f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java @@ -82,9 +82,9 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg() .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME); public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true) - .withDescription("Cube Segment Id").create("segmentId"); + .withDescription("Cube Segment Id").create(BatchConstants.ARG_SEGMENT_ID); public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true) - .withDescription("HDFS metadata url").create("metaUrl"); + .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL); public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg() .isRequired(true).withDescription("HFile output path").create(BatchConstants.ARG_OUTPUT); public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()