This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/main by this push: new 444ae0604e KYLIN-5184, fix cuboid statistic feature (#1877) 444ae0604e is described below commit 444ae0604e8290cf18b5defddfdd6d79006f7501 Author: Tengting Xu <34978943+muk...@users.noreply.github.com> AuthorDate: Thu Jun 16 10:58:50 2022 +0800 KYLIN-5184, fix cuboid statistic feature (#1877) * KYLIN-5184, fix cuboid statistic feature * minor, adapt for cuboids without base cuboid * minor, adapt for cuboids with original logic * minor, adapt for cuboids with original logic * minor, adapt for cuboids with original logic --- .../java/org/apache/kylin/engine/mr/CubingJob.java | 6 +- .../kylin/engine/mr/common/CubeStatsReader.java | 167 +++++++++++---------- .../engine/mr/common/CuboidRecommenderUtil.java | 4 +- .../engine/mr/common/CuboidStatsReaderUtil.java | 6 +- .../kylin/engine/mr/common/MapReduceUtil.java | 8 +- .../engine/mr/common/StatisticsDecisionUtil.java | 16 +- .../java/org/apache/kylin/cube/CubeManager.java | 10 +- .../java/org/apache/kylin/cube/CubeSegment.java | 58 +++++++ .../kylin/cube/cuboid/DefaultCuboidScheduler.java | 5 +- .../org/apache/kylin/cube/kv/CubeDimEncMap.java | 10 +- .../job/impl/curator/CuratorLeaderSelector.java | 5 +- .../org/apache/kylin/metadata/model/Segments.java | 14 +- .../kylin/engine/spark/job/OptimizeBuildJob.java | 17 ++- .../engine/spark/utils/UpdateMetadataUtil.java | 4 +- .../engine/spark/builder/CubeMergeAssist.java | 5 +- .../kylin/engine/spark/job/CubeBuildJob.java | 18 ++- .../kylin/engine/spark/job/CubeMergeJob.java | 14 +- .../engine/spark/job/ParentSourceChooser.scala | 11 +- .../kylin/engine/spark2/NOptimizeJobTest.java | 2 +- 19 files changed, 246 insertions(+), 134 deletions(-) diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/CubingJob.java index dc6350fd1b..bd391f29b8 100644 --- a/build-engine/src/main/java/org/apache/kylin/engine/mr/CubingJob.java +++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/CubingJob.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -364,7 +364,7 @@ public class CubingJob extends DefaultChainedExecutable { String cuboidRootPath = getCuboidRootPath(seg, config); try { - estimatedSizeMap = new CubeStatsReader(seg, config).getCuboidSizeMap(true); + estimatedSizeMap = new CubeStatsReader(seg, config, true).getCuboidSizeMap(true); } catch (IOException e) { logger.warn("Cannot get segment {} estimated size map", seg.getName()); diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java index 353e7f4b92..14f0cbf0ae 100644 --- a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java +++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -86,65 +86,73 @@ public class CubeStatsReader { final Map<Long, HLLCounter> cuboidRowEstimatesHLL; final CuboidScheduler cuboidScheduler; public final long sourceRowCount; + private boolean isPrecise = false; - public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) throws IOException { - this(cubeSegment, cubeSegment.getCuboidScheduler(), kylinConfig); + public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig, boolean enableHll) throws IOException { + this(cubeSegment, cubeSegment.getCuboidScheduler(), kylinConfig, enableHll); } /** * @param cuboidScheduler if it's null, part of it's functions will not be supported */ - public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler, KylinConfig kylinConfig) + public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler, KylinConfig kylinConfig, boolean enableHll) throws IOException { this.seg = cubeSegment; this.cuboidScheduler = cuboidScheduler; - ResourceStore store = ResourceStore.getStore(kylinConfig); - String statsKey = cubeSegment.getStatisticsResourcePath(); - RawResource resource = store.getResource(statsKey); - if (resource != null) { - File tmpSeqFile = writeTmpSeqFile(resource.content()); - Path path = new Path(HadoopUtil.fixWindowsPath("file://" + tmpSeqFile.getAbsolutePath())); - logger.info("Reading statistics from {}", path); - CubeStatsResult cubeStatsResult = new CubeStatsResult(path, kylinConfig.getCubeStatsHLLPrecision()); - tmpSeqFile.delete(); - - this.samplingPercentage = cubeStatsResult.getPercentage(); - this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber(); - this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.getMapperOverlapRatio(); - this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap(); - this.sourceRowCount = cubeStatsResult.getSourceRecordCount(); - } else { - // throw new IllegalStateException("Missing resource at " + statsKey); - logger.warn("{} is not exists.", statsKey); - this.samplingPercentage = -1; + + if (!enableHll && seg.getCuboidStaticsSize() != null + && !seg.getCuboidStaticsSize().isEmpty() + && seg.getCuboidStaticsRows() != null + && !seg.getCuboidStaticsRows().isEmpty()) { + logger.info("Reading precise statics from segment {}", seg.getUuid()); + this.isPrecise = true; + this.samplingPercentage = 100; this.mapperNumberOfFirstBuild = -1; this.mapperOverlapRatioOfFirstBuild = -1.0; this.cuboidRowEstimatesHLL = null; this.sourceRowCount = -1L; + } else { + ResourceStore store = ResourceStore.getStore(kylinConfig); + String statsKey = seg.getStatisticsResourcePath(); + RawResource resource = store.getResource(statsKey); + if (resource != null) { + File tmpSeqFile = writeTmpSeqFile(resource.content()); + Path path = new Path(HadoopUtil.fixWindowsPath("file://" + tmpSeqFile.getAbsolutePath())); + logger.info("Reading statistics from {}", path); + CubeStatsResult cubeStatsResult = new CubeStatsResult(path, kylinConfig.getCubeStatsHLLPrecision()); + tmpSeqFile.delete(); + + this.samplingPercentage = cubeStatsResult.getPercentage(); + this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber(); + this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.getMapperOverlapRatio(); + this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap(); + this.sourceRowCount = cubeStatsResult.getSourceRecordCount(); + } else { + // throw new IllegalStateException("Missing resource at " + statsKey); + logger.warn("{} is not exists.", statsKey); + this.samplingPercentage = -1; + this.mapperNumberOfFirstBuild = -1; + this.mapperOverlapRatioOfFirstBuild = -1.0; + this.cuboidRowEstimatesHLL = null; + this.sourceRowCount = -1L; + } } } - /** - * Read statistics from - * @param path - * rather than - * @param cubeSegment - * - * Since the statistics are from - * @param path - * cuboid scheduler should be provided by default - */ - public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler, KylinConfig kylinConfig, Path path) - throws IOException { - CubeStatsResult cubeStatsResult = new CubeStatsResult(path, kylinConfig.getCubeStatsHLLPrecision()); + public Map<Long, Long> getPreciseCuboidsRowsMap() { + return this.seg.getCuboidStaticsRows(); + } - this.seg = cubeSegment; - this.cuboidScheduler = cuboidScheduler; - this.samplingPercentage = cubeStatsResult.getPercentage(); - this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber(); - this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.getMapperOverlapRatio(); - this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap(); - this.sourceRowCount = cubeStatsResult.getSourceRecordCount(); + public Map<Long, Double> getPreciseCuboidsSizeMap() { + return handlePreciseCuboidsSize(this.seg.getCuboidStaticsSize()); + } + + private Map<Long, Double> handlePreciseCuboidsSize(Map<Long, Long> cuboidSizeMap) { + Map<Long, Double> sizeMap = Maps.newHashMap(); + for (Map.Entry<Long, Long> entry: cuboidSizeMap.entrySet()) { + sizeMap.put(entry.getKey(), 1.0 * entry.getValue()/(1024L * 1024L)); + } + return sizeMap; } private File writeTmpSeqFile(InputStream inputStream) throws IOException { @@ -320,13 +328,9 @@ public class CubeStatsReader { sizeMap.put(cuboidId, oriValue * rate); } } - logger.info("cube size is {} after optimize", SumHelper.sumDouble(sizeMap.values())); - - return; } - /** * Estimate the cuboid's size * @@ -353,8 +357,9 @@ public class CubeStatsReader { double percentileSpace = 0; int topNSpace = 0; for (MeasureDesc measureDesc : cubeSegment.getCubeDesc().getMeasures()) { - if (rowCount == 0) + if (rowCount == 0) { break; + } DataType returnType = measureDesc.getFunction().getReturnDataType(); if (measureDesc.getFunction().getExpression().equals(FunctionDesc.FUNC_COUNT_DISTINCT)) { long estimateDistinctCount = sourceRowCount / rowCount; @@ -381,19 +386,29 @@ public class CubeStatsReader { } private void print(PrintWriter out) { - Map<Long, Long> cuboidRows = getCuboidRowEstimatesHLL(); - Map<Long, Double> cuboidSizes = getCuboidSizeMap(); + Map<Long, Long> cuboidRows; + Map<Long, Double> cuboidSizes; + if (isPrecise) { + cuboidRows = getPreciseCuboidsRowsMap(); + cuboidSizes = getPreciseCuboidsSizeMap(); + } else { + cuboidRows = getCuboidRowEstimatesHLL(); + cuboidSizes = getCuboidSizeMap(); + } List<Long> cuboids = new ArrayList<Long>(cuboidRows.keySet()); Collections.sort(cuboids); + String estimatedOrPrecise = isPrecise? "precise" : "estimated"; out.println("============================================================================"); out.println("Statistics of " + seg); out.println(); - out.println( - "Cube statistics hll precision: " + cuboidRowEstimatesHLL.values().iterator().next().getPrecision()); + if (!isPrecise) { + out.println("Cube statistics hll precision: " + + cuboidRowEstimatesHLL.values().iterator().next().getPrecision()); + } out.println("Total cuboids: " + cuboidRows.size()); - out.println("Total estimated rows: " + SumHelper.sumLong(cuboidRows.values())); - out.println("Total estimated size(MB): " + SumHelper.sumDouble(cuboidSizes.values())); + out.println("Total " + estimatedOrPrecise + " rows: " + SumHelper.sumLong(cuboidRows.values())); + out.println("Total " + estimatedOrPrecise + " size(MB): " + SumHelper.sumDouble(cuboidSizes.values())); out.println("Sampling percentage: " + samplingPercentage); out.println("Mapper overlap ratio: " + mapperOverlapRatioOfFirstBuild); out.println("Mapper number: " + mapperNumberOfFirstBuild); @@ -419,21 +434,14 @@ public class CubeStatsReader { return ret; } - public List<Long> getCuboidsByLayer(int level) { - if (cuboidScheduler == null) { - throw new UnsupportedOperationException("cuboid scheduler is null"); - } - List<List<Long>> layeredCuboids = cuboidScheduler.getCuboidsByLayer(); - return layeredCuboids.get(level); - } - private void printCuboidInfoTreeEntry(Map<Long, Long> cuboidRows, Map<Long, Double> cuboidSizes, PrintWriter out) { if (cuboidScheduler == null) { throw new UnsupportedOperationException("cuboid scheduler is null"); } long baseCuboid = Cuboid.getBaseCuboidId(seg.getCubeDesc()); int dimensionCount = Long.bitCount(baseCuboid); - printCuboidInfoTree(-1L, baseCuboid, cuboidScheduler, cuboidRows, cuboidSizes, dimensionCount, 0, out); + printCuboidInfoTree(-1L, baseCuboid, cuboidScheduler, cuboidRows, cuboidSizes, dimensionCount, 0, out, + this.isPrecise); } private void printKVInfo(PrintWriter writer) { @@ -445,19 +453,21 @@ public class CubeStatsReader { } private static void printCuboidInfoTree(long parent, long cuboidID, final CuboidScheduler scheduler, - Map<Long, Long> cuboidRows, Map<Long, Double> cuboidSizes, int dimensionCount, int depth, PrintWriter out) { - printOneCuboidInfo(parent, cuboidID, cuboidRows, cuboidSizes, dimensionCount, depth, out); + Map<Long, Long> cuboidRows, Map<Long, Double> cuboidSizes, int dimensionCount, int depth, PrintWriter out, + boolean isPrecise) { + printOneCuboidInfo(parent, cuboidID, cuboidRows, cuboidSizes, dimensionCount, depth, out, isPrecise); List<Long> children = scheduler.getSpanningCuboid(cuboidID); Collections.sort(children); for (Long child : children) { - printCuboidInfoTree(cuboidID, child, scheduler, cuboidRows, cuboidSizes, dimensionCount, depth + 1, out); + printCuboidInfoTree(cuboidID, child, scheduler, cuboidRows, cuboidSizes, dimensionCount, depth + 1, + out, isPrecise); } } private static void printOneCuboidInfo(long parent, long cuboidID, Map<Long, Long> cuboidRows, - Map<Long, Double> cuboidSizes, int dimensionCount, int depth, PrintWriter out) { + Map<Long, Double> cuboidSizes, int dimensionCount, int depth, PrintWriter out, boolean isPrecise) { StringBuffer sb = new StringBuffer(); for (int i = 0; i < depth; i++) { sb.append(" "); @@ -465,16 +475,21 @@ public class CubeStatsReader { String cuboidName = Cuboid.getDisplayName(cuboidID, dimensionCount); sb.append("|---- Cuboid ").append(cuboidName); - long rowCount = cuboidRows.get(cuboidID); - double size = cuboidSizes.get(cuboidID); - sb.append(", est row: ").append(rowCount).append(", est MB: ").append(formatDouble(size)); + long rowCount = cuboidRows.get(cuboidID) == null ? 0: cuboidRows.get(cuboidID); + double size = cuboidSizes.get(cuboidID)== null ? 0.0: cuboidSizes.get(cuboidID); + String markPreciseOrEstimate = isPrecise ? "precise" : "est"; + sb.append(", ").append(markPreciseOrEstimate).append(" row: ").append(rowCount) + .append(", ").append(markPreciseOrEstimate).append(" MB: ") + .append(formatDouble(size)); if (parent != -1) { - sb.append(", shrink: ").append(formatDouble(100.0 * cuboidRows.get(cuboidID) / cuboidRows.get(parent))) - .append("%"); + double shrink = -0.0; + if (cuboidRows.get(parent) != null) { + shrink = 100.0 * cuboidRows.get(cuboidID) / cuboidRows.get(parent); + } + sb.append(", shrink: ").append(formatDouble(shrink)).append("%"); } - - out.println(sb.toString()); + out.println(sb); } private static String formatDouble(double input) { @@ -544,7 +559,7 @@ public class CubeStatsReader { new BufferedWriter(new OutputStreamWriter(System.out, StandardCharsets.UTF_8))); for (CubeSegment seg : segments) { try { - new CubeStatsReader(seg, config).print(out); + new CubeStatsReader(seg, config, false).print(out); } catch (Exception e) { logger.info("CubeStatsReader for Segment {} failed, skip it.", seg.getName()); } diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java index 71d2fd1a6a..f989a68258 100644 --- a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java +++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java @@ -43,7 +43,7 @@ public class CuboidRecommenderUtil { return null; } - CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, null, segment.getConfig()); + CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, null, segment.getConfig(), true); if (cubeStatsReader.getCuboidRowEstimatesHLL() == null || cubeStatsReader.getCuboidRowEstimatesHLL().isEmpty()) { logger.info("Cuboid Statistics is not enabled."); @@ -115,7 +115,7 @@ public class CuboidRecommenderUtil { return null; } - CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, null, segment.getConfig()); + CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, null, segment.getConfig(), true); if (cubeStatsReader.getCuboidRowEstimatesHLL() == null || cubeStatsReader.getCuboidRowEstimatesHLL().isEmpty()) { logger.info("Cuboid Statistics is not enabled."); diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java index a5fbe2b75b..9cf50f7c1c 100644 --- a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java +++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java @@ -74,7 +74,7 @@ public class CuboidStatsReaderUtil { List<CubeSegment> segmentList = cube.getSegments(SegmentStatusEnum.READY); Map<Long, Double> sizeMerged = Maps.newHashMapWithExpectedSize(statistics.size()); for (CubeSegment pSegment : segmentList) { - CubeStatsReader pReader = new CubeStatsReader(pSegment, null, pSegment.getConfig()); + CubeStatsReader pReader = new CubeStatsReader(pSegment, null, pSegment.getConfig(), true); Map<Long, Double> pSizeMap = CubeStatsReader.getCuboidSizeMapFromRowCount(pSegment, statistics, pReader.sourceRowCount); for (Long pCuboid : statistics.keySet()) { @@ -102,7 +102,7 @@ public class CuboidStatsReaderUtil { Map<Long, HLLCounter> cuboidHLLMapMerged = Maps.newHashMapWithExpectedSize(cuboidSet.size()); Map<Long, Double> sizeMapMerged = Maps.newHashMapWithExpectedSize(cuboidSet.size()); for (CubeSegment pSegment : segmentList) { - CubeStatsReader pReader = new CubeStatsReader(pSegment, null, pSegment.getConfig()); + CubeStatsReader pReader = new CubeStatsReader(pSegment, null, pSegment.getConfig(), true); Map<Long, HLLCounter> pHLLMap = pReader.getCuboidRowHLLCounters(); if (pHLLMap == null || pHLLMap.isEmpty()) { logger.info("Cuboid Statistics for segment " + pSegment.getName() + " is not enabled."); @@ -147,7 +147,7 @@ public class CuboidStatsReaderUtil { return null; } - CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, null, cubeSegment.getConfig()); + CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, null, cubeSegment.getConfig(), true); if (cubeStatsReader.getCuboidRowEstimatesHLL() == null || cubeStatsReader.getCuboidRowEstimatesHLL().isEmpty()) { logger.info("Cuboid Statistics is not enabled."); diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java index ecde4aa7ba..9a6a550682 100644 --- a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java +++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -67,7 +67,7 @@ public class MapReduceUtil { logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level " + level); - CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, cuboidScheduler, kylinConfig); + CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, cuboidScheduler, kylinConfig, true); double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst; @@ -111,7 +111,7 @@ public class MapReduceUtil { throws IOException { KylinConfig kylinConfig = cubeSeg.getConfig(); - Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, cuboidScheduler, kylinConfig).getCuboidSizeMap(); + Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, cuboidScheduler, kylinConfig, true).getCuboidSizeMap(); double totalSizeInM = 0; for (Double cuboidSize : cubeSizeMap.values()) { totalSizeInM += cuboidSize; diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java index dc96c56c4a..3974b652a8 100644 --- a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java +++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java @@ -19,11 +19,11 @@ package org.apache.kylin.engine.mr.common; import java.io.IOException; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; +import com.google.common.collect.Maps; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -39,7 +39,7 @@ public class StatisticsDecisionUtil { protected static final Logger logger = LoggerFactory.getLogger(StatisticsDecisionUtil.class); public static void decideCubingAlgorithm(CubingJob cubingJob, CubeSegment seg) throws IOException { - CubeStatsReader cubeStats = new CubeStatsReader(seg, null, seg.getConfig()); + CubeStatsReader cubeStats = new CubeStatsReader(seg, null, seg.getConfig(), true); decideCubingAlgorithm(cubingJob, seg, cubeStats.getMapperOverlapRatioOfFirstBuild(), cubeStats.getMapperNumberOfFirstBuild()); } @@ -95,15 +95,14 @@ public class StatisticsDecisionUtil { // For triggering cube planner phase one public static Map<Long, Long> optimizeCubingPlan(CubeSegment segment) throws IOException { - if (isAbleToOptimizeCubingPlan(segment)) { - logger.info("It's able to trigger cuboid planner algorithm."); - } else { - return new HashMap<>(); + if (!isAbleToOptimizeCubingPlan(segment)) { + return Maps.newHashMap(); } + logger.info("It's able to trigger cuboid planner algorithm."); Map<Long, Long> recommendCuboidsWithStats = CuboidRecommenderUtil.getRecommendCuboidList(segment); if (recommendCuboidsWithStats == null || recommendCuboidsWithStats.isEmpty()) { - return new HashMap<>(); + return Maps.newHashMap(); } CubeInstance cube = segment.getCubeInstance(); @@ -115,8 +114,9 @@ public class StatisticsDecisionUtil { public static boolean isAbleToOptimizeCubingPlan(CubeSegment segment) { CubeInstance cube = segment.getCubeInstance(); - if (!cube.getConfig().isCubePlannerEnabled()) + if (!cube.getConfig().isCubePlannerEnabled()) { return false; + } if (cube.getSegments(SegmentStatusEnum.READY_PENDING).size() > 0) { logger.info("Has read pending segments and will not enable cube planner."); diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 838c8233df..d24ecd480c 100755 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -371,16 +371,18 @@ public class CubeManager implements IRealizationProvider { } private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry, boolean isLocal) throws IOException { - if (update == null || update.getCubeInstance() == null) + if (update == null || update.getCubeInstance() == null) { throw new IllegalStateException(); + } CubeInstance cube = update.getCubeInstance(); logger.info("Updating cube instance '{}'", cube.getName()); Segments<CubeSegment> newSegs = (Segments) cube.getSegments().clone(); - if (update.getToAddSegs() != null) + if (update.getToAddSegs() != null) { newSegs.addAll(Arrays.asList(update.getToAddSegs())); + } List<String> toRemoveResources = Lists.newArrayList(); if (update.getToRemoveSegs() != null) { diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java index 6f54e1eb98..b6d134a079 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java @@ -18,7 +18,9 @@ package org.apache.kylin.cube; +import java.io.IOException; import java.io.Serializable; +import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Collection; @@ -30,9 +32,12 @@ import java.util.Set; import java.util.TimeZone; import java.util.concurrent.ConcurrentHashMap; +import com.fasterxml.jackson.core.type.TypeReference; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.annotation.Clarification; import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.util.CompressionUtils; +import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.ShardingHash; import org.apache.kylin.cube.cuboid.CuboidScheduler; @@ -139,6 +144,12 @@ public class CubeSegment implements IBuildable, ISegment, Serializable { @JsonInclude(JsonInclude.Include.NON_EMPTY) private Map<String, DimensionRangeInfo> dimensionRangeInfoMap = Maps.newHashMap(); + @JsonProperty("cuboid_statics_rows_bytes") + private byte[] cuboidStaticsRowsBytes; + + @JsonProperty("cuboid_statics_size_bytes") + private byte[] cuboidStaticsSizeBytes; + private Map<Long, Short> cuboidBaseShards = Maps.newConcurrentMap(); // cuboid id ==> base(starting) shard for this cuboid // lazy init @@ -690,6 +701,53 @@ public class CubeSegment implements IBuildable, ISegment, Serializable { copy.dimensionRangeInfoMap = other.dimensionRangeInfoMap == null ? null : Maps.newHashMap(other.dimensionRangeInfoMap); copy.binarySignature = other.binarySignature; + copy.cuboidStaticsRowsBytes = other.cuboidStaticsRowsBytes; + copy.cuboidStaticsSizeBytes = other.cuboidStaticsSizeBytes; return copy; } + + public void setCuboidStaticsRowsBytes(Map<Long, Long> staticsOfRows) { + this.cuboidStaticsRowsBytes = compressedCuboids(staticsOfRows); + } + + public void setCuboidStaticsSizeBytes(Map<Long, Long> staticsOfSize) { + this.cuboidStaticsSizeBytes = compressedCuboids(staticsOfSize); + } + + public Map<Long, Long> getCuboidStaticsRows() { + return this.decompressCuboids(this.cuboidStaticsRowsBytes); + } + + public Map<Long, Long> getCuboidStaticsSize() { + return this.decompressCuboids(this.cuboidStaticsSizeBytes); + } + + private Map<Long, Long> decompressCuboids(byte[] cuboidStaticsBytes) { + if (cuboidStaticsBytes == null) { + return null; + } + byte[] uncompressed; + try { + uncompressed = CompressionUtils.decompress(cuboidStaticsBytes); + String str = new String(uncompressed, StandardCharsets.UTF_8); + TypeReference<Map<Long, Long>> typeRef = new TypeReference<Map<Long, Long>>() {}; + Map<Long, Long> cuboids = JsonUtil.readValue(str, typeRef); + return cuboids.isEmpty() ? null : cuboids; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private byte[] compressedCuboids(Map<Long, Long> cuboids) { + if (cuboids == null || cuboids.isEmpty()) { + return null; + } + + try { + String str = JsonUtil.writeValueAsString(cuboids); + return CompressionUtils.compress(str.getBytes(StandardCharsets.UTF_8)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/DefaultCuboidScheduler.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/DefaultCuboidScheduler.java index a5585f6d89..07e2030f7f 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/DefaultCuboidScheduler.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/DefaultCuboidScheduler.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -289,6 +289,7 @@ public class DefaultCuboidScheduler extends CuboidScheduler { * @param agg agg group * @return cuboidId list */ + @Override public Set<Long> calculateCuboidsForAggGroup(AggregationGroup agg) { Set<Long> cuboidHolder = new HashSet<>(); diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java index 64624cad89..7d71c34388 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -83,10 +83,10 @@ public class CubeDimEncMap implements IDimensionEncodingMap, java.io.Serializabl @Override public Dictionary<String> getDictionary(TblColRef col) { -// if (seg == null) + if (dictionaryMap == null) { + return null; + } return dictionaryMap.get(col); -// else -// return seg.getDictionary(col); } } diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorLeaderSelector.java b/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorLeaderSelector.java index f67cf454e7..d6e9532e1e 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorLeaderSelector.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorLeaderSelector.java @@ -109,8 +109,9 @@ public class CuratorLeaderSelector extends LeaderSelectorListenerAdapter impleme logger.error("Other exception occurred when initialization DefaultScheduler:", th); } finally { logger.warn(this.name + " relinquishing leadership."); - if (defaultScheduler != null) + if (defaultScheduler != null) { defaultScheduler.shutdown(); + } } } -} \ No newline at end of file +} diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java index 1a2b7d7e76..a3c17fa71e 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java @@ -109,8 +109,9 @@ public class Segments<T extends ISegment> extends ArrayList<T> implements Serial for (int i = this.size() - 1; i >= 0; i--) { T seg = this.get(i); if (seg.getLastBuildTime() > 0) { - if (latest == null || seg.getLastBuildTime() > latest.getLastBuildTime()) + if (latest == null || seg.getLastBuildTime() > latest.getLastBuildTime()) { latest = seg; + } } } return latest; @@ -277,10 +278,10 @@ public class Segments<T extends ISegment> extends ArrayList<T> implements Serial } } - if (result.size() <= 1) + if (result.size() <= 1) { return null; - else - return (Pair<T, T>) Pair.newPair(result.getFirst(), result.getLast()); + } + return (Pair<T, T>) Pair.newPair(result.getFirst(), result.getLast()); } /** @@ -295,8 +296,9 @@ public class Segments<T extends ISegment> extends ArrayList<T> implements Serial if (newSegment != null && !tobe.contains(newSegment)) { tobe.add(newSegment); } - if (tobe.size() == 0) + if (tobe.size() == 0) { return tobe; + } // sort by source offset Collections.sort(tobe); @@ -468,7 +470,7 @@ public class Segments<T extends ISegment> extends ArrayList<T> implements Serial public Pair<Boolean, Boolean> fitInSegments(ISegment newOne) { if (this.isEmpty()) { - return Pair.newPair(false, false); + return Pair.newPair(false, false); } ISegment first = this.get(0); diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java index 8a412d5d53..a0598f818f 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java @@ -74,12 +74,13 @@ import java.util.stream.Collectors; public class OptimizeBuildJob extends SparkApplication { private static final Logger logger = LoggerFactory.getLogger(OptimizeBuildJob.class); - private Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap(); protected static String TEMP_DIR_SUFFIX = "_temp"; private BuildLayoutWithUpdate buildLayoutWithUpdate; private Map<Long, Short> cuboidShardNum = Maps.newConcurrentMap(); private Map<Long, Long> cuboidsRowCount = Maps.newConcurrentMap(); + Map<Long, Long> cuboidIdToPreciseRows; + Map<Long, Long> cuboidIdToPreciseSize; private Configuration conf = HadoopUtil.getCurrentConfiguration(); private CubeManager cubeManager; @@ -105,6 +106,9 @@ public class OptimizeBuildJob extends SparkApplication { optSeg = cubeInstance.getSegmentById(segmentId); originalSeg = cubeInstance.getOriginalSegmentToOptimize(optSeg); originalSegInfo = ManagerHub.getSegmentInfo(config, cubeId, originalSeg.getUuid()); + // copy the original statistics + cuboidIdToPreciseSize = originalSeg.getCuboidStaticsSize(); + cuboidIdToPreciseRows = originalSeg.getCuboidStaticsRows(); calculateCuboidFromBaseCuboid(); buildCuboidFromParent(cubeId); @@ -150,10 +154,11 @@ public class OptimizeBuildJob extends SparkApplication { SpanningTree spanningTree; ParentSourceChooser sourceChooser; try { - spanningTree = new ForestSpanningTree(JavaConversions.asJavaCollection(optSegInfo.toBuildLayouts())); + Collection<LayoutEntity> cuboids = JavaConversions.asJavaCollection(optSegInfo.toBuildLayouts()); + spanningTree = new ForestSpanningTree(cuboids); logger.info("There are {} cuboids to be built in segment {}.", optSegInfo.toBuildLayouts().size(), optSegInfo.name()); - for (LayoutEntity cuboid : JavaConversions.asJavaCollection(optSegInfo.toBuildLayouts())) { + for (LayoutEntity cuboid : cuboids) { logger.debug("Cuboid {} has row keys: {}", cuboid.getId(), Joiner.on(", ").join(cuboid.getOrderedDimensions().keySet())); } @@ -205,6 +210,10 @@ public class OptimizeBuildJob extends SparkApplication { } optSeg.setCuboidShardNums(cuboidShardNum); optSeg.setInputRecordsSize(originalSeg.getInputRecordsSize()); + + optSeg.setCuboidStaticsSizeBytes(cuboidIdToPreciseSize); + optSeg.setCuboidStaticsRowsBytes(cuboidIdToPreciseRows); + Map<String, String> additionalInfo = optSeg.getAdditionalInfo(); additionalInfo.put("storageType", "" + IStorageAware.ID_PARQUET); optSeg.setAdditionalInfo(additionalInfo); @@ -407,6 +416,8 @@ public class OptimizeBuildJob extends SparkApplication { cuboidShardNum.put(layoutId, (short) shardNum); JobMetricsUtils.unRegisterQueryExecutionListener(ss, queryExecutionId); BuildUtils.fillCuboidInfo(layout, path); + cuboidIdToPreciseRows.put(layoutId, layout.getRows()); + cuboidIdToPreciseSize.put(layoutId, layout.getByteSize()); } private void updateExistingLayout(LayoutEntity layout, long parentId) throws IOException { diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java index 86e256f4ac..11291b3698 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java @@ -112,8 +112,8 @@ public class UpdateMetadataUtil { toUpdateSeg.getSnapshots().putAll(origSeg.getSnapshots()); toUpdateSeg.getRowkeyStats().addAll(origSeg.getRowkeyStats()); - CubeStatsReader optSegStatsReader = new CubeStatsReader(toUpdateSeg, config); - CubeStatsReader origSegStatsReader = new CubeStatsReader(origSeg, config); + CubeStatsReader optSegStatsReader = new CubeStatsReader(toUpdateSeg, config, true); + CubeStatsReader origSegStatsReader = new CubeStatsReader(origSeg, config, true); Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap(); if (origSegStatsReader.getCuboidRowHLLCounters() == null) { logger.warn( diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeMergeAssist.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeMergeAssist.java index c2b59e7e58..e142f73163 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeMergeAssist.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeMergeAssist.java @@ -85,11 +85,10 @@ public class CubeMergeAssist implements Serializable { if (mergeDataset == null) { mergeDataset = layoutDataset; - } else + } else { mergeDataset = mergeDataset.union(layoutDataset); + } } return mergeDataset; - } - } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java index a5f0e11403..f7b528f27c 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java @@ -89,6 +89,8 @@ public class CubeBuildJob extends SparkApplication { private BuildLayoutWithUpdate buildLayoutWithUpdate; private Map<Long, Short> cuboidShardNum = Maps.newConcurrentMap(); private Map<Long, Long> cuboidsRowCount = Maps.newConcurrentMap(); + private Map<Long, Long> cuboidIdToPreciseRows = Maps.newConcurrentMap(); + private Map<Long, Long> cuboidIdToPreciseSize = Maps.newConcurrentMap(); private Map<Long, Long> recommendCuboidMap = new HashMap<>(); public static void main(String[] args) { @@ -124,7 +126,7 @@ public class CubeBuildJob extends SparkApplication { long startMills = System.currentTimeMillis(); spanningTree = new ForestSpanningTree(JavaConversions.asJavaCollection(statisticsSeg.toBuildLayouts())); sourceChooser = new ParentSourceChooser(spanningTree, statisticsSeg, newSegment, jobId, ss, config, false); - sourceChooser.setNeedStatistics(); + sourceChooser.toStatistics(); sourceChooser.decideFlatTableSource(null); Map<Long, HLLCounter> hllMap = new HashMap<>(); for (Tuple2<Object, AggInfo> cuboidData : sourceChooser.aggInfo()) { @@ -231,6 +233,8 @@ public class CubeBuildJob extends SparkApplication { long diff = (layoutEntity.getRows() - recommendCuboidMap.get(layoutEntity.getId())); deviation = diff / (layoutEntity.getRows() + 0.0d); } + + collectPreciseStatics(cuboidIdToPreciseRows, cuboidIdToPreciseSize, layoutEntity); cuboidStatics.add(String.format(Locale.getDefault(), template, layoutEntity.getId(), layoutEntity.getRows(), layoutEntity.getByteSize(), deviation)); } @@ -258,6 +262,8 @@ public class CubeBuildJob extends SparkApplication { segment.setInputRecords(sourceRowCount); segment.setSnapshots(new ConcurrentHashMap<>(segmentInfo.getSnapShot2JavaMap())); segment.setCuboidShardNums(cuboidShardNum); + segment.setCuboidStaticsRowsBytes(cuboidIdToPreciseRows); + segment.setCuboidStaticsSizeBytes(cuboidIdToPreciseSize); Map<String, String> additionalInfo = segment.getAdditionalInfo(); additionalInfo.put("storageType", "" + IStorageAware.ID_PARQUET); segment.setAdditionalInfo(additionalInfo); @@ -266,6 +272,13 @@ public class CubeBuildJob extends SparkApplication { cubeManager.updateCube(update, true); } + private void collectPreciseStatics(Map<Long, Long> cuboidIdToPreciseRows, Map<Long, Long> cuboidIdToPreciseSize, + LayoutEntity layoutEntity) { + // Hold the precise statics of cuboid in segment + cuboidIdToPreciseRows.put(layoutEntity.getId(), layoutEntity.getRows()); + cuboidIdToPreciseSize.put(layoutEntity.getId(), layoutEntity.getByteSize()); + } + private void collectPersistedTablePath(List<String> persistedFlatTable, ParentSourceChooser sourceChooser) { String flatTablePath = sourceChooser.persistFlatTableIfNecessary(); if (!flatTablePath.isEmpty()) { @@ -279,8 +292,9 @@ public class CubeBuildJob extends SparkApplication { CubeInstance cubeCopy = cubeInstance.latestCopyForWrite(); CubeUpdate update = new CubeUpdate(cubeCopy); - if (recommendCuboidMap != null && !recommendCuboidMap.isEmpty()) + if (recommendCuboidMap != null && !recommendCuboidMap.isEmpty()) { update.setCuboids(recommendCuboidMap); + } List<CubeSegment> cubeSegments = Lists.newArrayList(); for (Map.Entry<String, Object> entry : toUpdateSegmentSourceSize.entrySet()) { diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java index 16fecd385c..6d7e10e1df 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java @@ -61,6 +61,8 @@ public class CubeMergeJob extends SparkApplication { private List<CubeSegment> mergingSegments = Lists.newArrayList(); private List<SegmentInfo> mergingSegInfos = Lists.newArrayList(); private Map<Long, Short> cuboidShardNum = Maps.newConcurrentMap(); + Map<Long, Long> cuboidIdToPreciseRows = Maps.newConcurrentMap(); + Map<Long, Long> cuboidIdToPreciseSize = Maps.newConcurrentMap(); @Override protected void doExecute() throws Exception { @@ -134,7 +136,6 @@ public class CubeMergeJob extends SparkApplication { for (int i = 0; i < cuboids.size(); i++) { LayoutEntity cuboid = cuboids.apply(i); long layoutId = cuboid.getId(); - CubeMergeAssist assist = mergeCuboidsAssist.get(layoutId); if (assist == null) { assist = new CubeMergeAssist(); @@ -191,7 +192,8 @@ public class CubeMergeJob extends SparkApplication { JobMetricsUtils.registerQueryExecutionListener(ss, queryExecutionId); BuildUtils.fillCuboidInfo(layout, path); - + cuboidIdToPreciseSize.put(layoutId, layout.getByteSize()); + cuboidIdToPreciseRows.put(layoutId, layout.getRows()); return layout; } @@ -202,9 +204,9 @@ public class CubeMergeJob extends SparkApplication { List<CubeSegment> cubeSegments = Lists.newArrayList(); CubeSegment segment = cubeCopy.getSegmentById(segmentId); - long totalSourceSize = 0l; - long totalInputRecords = 0l; - long totalInputRecordsSize = 0l; + long totalSourceSize = 0L; + long totalInputRecords = 0L; + long totalInputRecordsSize = 0L; for (CubeMergeAssist assist : mergeCuboidsAssist.values()) { totalSourceSize += assist.getLayout().getByteSize(); } @@ -221,6 +223,8 @@ public class CubeMergeJob extends SparkApplication { Map<String, String> additionalInfo = segment.getAdditionalInfo(); additionalInfo.put("storageType", "" + IStorageAware.ID_PARQUET); segment.setAdditionalInfo(additionalInfo); + segment.setCuboidStaticsRowsBytes(cuboidIdToPreciseRows); + segment.setCuboidStaticsSizeBytes(cuboidIdToPreciseSize); cubeSegments.add(segment); update.setToUpdateSegs(cubeSegments.toArray(new CubeSegment[0])); cubeManager.updateCube(update, true); diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala index 0f77c768a9..a82a82144d 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala @@ -50,15 +50,20 @@ class ParentSourceChooser( // build from flatTable. var flatTableSource: NBuildSourceInfo = _ - private var needStatistics = false + private var needStatistics: Boolean = false //TODO: MetadataConverter don't have getCubeDesc() now /*val flatTableDesc = new CubeJoinedFlatTableDesc( MetadataConverter.getCubeDesc(segInfo.getCube), ParentSourceChooser.needJoinLookupTables(segInfo.getModel, toBuildTree))*/ - def setNeedStatistics(): Unit = - needStatistics = true + def toStatistics(): Unit = { + this.needStatistics = true + } + + def cancelStatistics(): Unit = { + this.needStatistics = false + } def getAggInfo : Array[(Long, AggInfo)] = aggInfo diff --git a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NOptimizeJobTest.java b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NOptimizeJobTest.java index 5e5789c3eb..6fdea85947 100644 --- a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NOptimizeJobTest.java +++ b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NOptimizeJobTest.java @@ -110,7 +110,7 @@ public class NOptimizeJobTest extends LocalWithSparkSessionTest { FileSystem fs = HadoopUtil.getWorkingFileSystem(); for (CubeSegment segment : cube.getSegments()) { Assert.assertEquals(SegmentStatusEnum.READY, segment.getStatus()); - CubeStatsReader segStatsReader = new CubeStatsReader(segment, config); + CubeStatsReader segStatsReader = new CubeStatsReader(segment, config, true); Assert.assertEquals(recommendCuboids, segStatsReader.getCuboidRowHLLCounters().keySet()); String cuboidPath = PathManager.getSegmentParquetStoragePath(cube, segment.getName(), segment.getStorageLocationIdentifier()); Assert.assertTrue(fs.exists(new Path(cuboidPath)));