This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/main by this push:
new 444ae0604e KYLIN-5184, fix cuboid statistic feature (#1877)
444ae0604e is described below
commit 444ae0604e8290cf18b5defddfdd6d79006f7501
Author: Tengting Xu <[email protected]>
AuthorDate: Thu Jun 16 10:58:50 2022 +0800
KYLIN-5184, fix cuboid statistic feature (#1877)
* KYLIN-5184, fix cuboid statistic feature
* minor, adapt for cuboids without base cuboid
* minor, adapt for cuboids with original logic
* minor, adapt for cuboids with original logic
* minor, adapt for cuboids with original logic
---
.../java/org/apache/kylin/engine/mr/CubingJob.java | 6 +-
.../kylin/engine/mr/common/CubeStatsReader.java | 167 +++++++++++----------
.../engine/mr/common/CuboidRecommenderUtil.java | 4 +-
.../engine/mr/common/CuboidStatsReaderUtil.java | 6 +-
.../kylin/engine/mr/common/MapReduceUtil.java | 8 +-
.../engine/mr/common/StatisticsDecisionUtil.java | 16 +-
.../java/org/apache/kylin/cube/CubeManager.java | 10 +-
.../java/org/apache/kylin/cube/CubeSegment.java | 58 +++++++
.../kylin/cube/cuboid/DefaultCuboidScheduler.java | 5 +-
.../org/apache/kylin/cube/kv/CubeDimEncMap.java | 10 +-
.../job/impl/curator/CuratorLeaderSelector.java | 5 +-
.../org/apache/kylin/metadata/model/Segments.java | 14 +-
.../kylin/engine/spark/job/OptimizeBuildJob.java | 17 ++-
.../engine/spark/utils/UpdateMetadataUtil.java | 4 +-
.../engine/spark/builder/CubeMergeAssist.java | 5 +-
.../kylin/engine/spark/job/CubeBuildJob.java | 18 ++-
.../kylin/engine/spark/job/CubeMergeJob.java | 14 +-
.../engine/spark/job/ParentSourceChooser.scala | 11 +-
.../kylin/engine/spark2/NOptimizeJobTest.java | 2 +-
19 files changed, 246 insertions(+), 134 deletions(-)
diff --git
a/build-engine/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
b/build-engine/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index dc6350fd1b..bd391f29b8 100644
--- a/build-engine/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -364,7 +364,7 @@ public class CubingJob extends DefaultChainedExecutable {
String cuboidRootPath = getCuboidRootPath(seg, config);
try {
- estimatedSizeMap = new CubeStatsReader(seg,
config).getCuboidSizeMap(true);
+ estimatedSizeMap = new CubeStatsReader(seg, config,
true).getCuboidSizeMap(true);
} catch (IOException e) {
logger.warn("Cannot get segment {} estimated size map",
seg.getName());
diff --git
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index 353e7f4b92..14f0cbf0ae 100644
---
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -86,65 +86,73 @@ public class CubeStatsReader {
final Map<Long, HLLCounter> cuboidRowEstimatesHLL;
final CuboidScheduler cuboidScheduler;
public final long sourceRowCount;
+ private boolean isPrecise = false;
- public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig)
throws IOException {
- this(cubeSegment, cubeSegment.getCuboidScheduler(), kylinConfig);
+ public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig,
boolean enableHll) throws IOException {
+ this(cubeSegment, cubeSegment.getCuboidScheduler(), kylinConfig,
enableHll);
}
/**
* @param cuboidScheduler if it's null, part of it's functions will not be
supported
*/
- public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler
cuboidScheduler, KylinConfig kylinConfig)
+ public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler
cuboidScheduler, KylinConfig kylinConfig, boolean enableHll)
throws IOException {
this.seg = cubeSegment;
this.cuboidScheduler = cuboidScheduler;
- ResourceStore store = ResourceStore.getStore(kylinConfig);
- String statsKey = cubeSegment.getStatisticsResourcePath();
- RawResource resource = store.getResource(statsKey);
- if (resource != null) {
- File tmpSeqFile = writeTmpSeqFile(resource.content());
- Path path = new Path(HadoopUtil.fixWindowsPath("file://" +
tmpSeqFile.getAbsolutePath()));
- logger.info("Reading statistics from {}", path);
- CubeStatsResult cubeStatsResult = new CubeStatsResult(path,
kylinConfig.getCubeStatsHLLPrecision());
- tmpSeqFile.delete();
-
- this.samplingPercentage = cubeStatsResult.getPercentage();
- this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber();
- this.mapperOverlapRatioOfFirstBuild =
cubeStatsResult.getMapperOverlapRatio();
- this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap();
- this.sourceRowCount = cubeStatsResult.getSourceRecordCount();
- } else {
- // throw new IllegalStateException("Missing resource at " +
statsKey);
- logger.warn("{} is not exists.", statsKey);
- this.samplingPercentage = -1;
+
+ if (!enableHll && seg.getCuboidStaticsSize() != null
+ && !seg.getCuboidStaticsSize().isEmpty()
+ && seg.getCuboidStaticsRows() != null
+ && !seg.getCuboidStaticsRows().isEmpty()) {
+ logger.info("Reading precise statics from segment {}",
seg.getUuid());
+ this.isPrecise = true;
+ this.samplingPercentage = 100;
this.mapperNumberOfFirstBuild = -1;
this.mapperOverlapRatioOfFirstBuild = -1.0;
this.cuboidRowEstimatesHLL = null;
this.sourceRowCount = -1L;
+ } else {
+ ResourceStore store = ResourceStore.getStore(kylinConfig);
+ String statsKey = seg.getStatisticsResourcePath();
+ RawResource resource = store.getResource(statsKey);
+ if (resource != null) {
+ File tmpSeqFile = writeTmpSeqFile(resource.content());
+ Path path = new Path(HadoopUtil.fixWindowsPath("file://" +
tmpSeqFile.getAbsolutePath()));
+ logger.info("Reading statistics from {}", path);
+ CubeStatsResult cubeStatsResult = new CubeStatsResult(path,
kylinConfig.getCubeStatsHLLPrecision());
+ tmpSeqFile.delete();
+
+ this.samplingPercentage = cubeStatsResult.getPercentage();
+ this.mapperNumberOfFirstBuild =
cubeStatsResult.getMapperNumber();
+ this.mapperOverlapRatioOfFirstBuild =
cubeStatsResult.getMapperOverlapRatio();
+ this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap();
+ this.sourceRowCount = cubeStatsResult.getSourceRecordCount();
+ } else {
+ // throw new IllegalStateException("Missing resource at " +
statsKey);
+ logger.warn("{} is not exists.", statsKey);
+ this.samplingPercentage = -1;
+ this.mapperNumberOfFirstBuild = -1;
+ this.mapperOverlapRatioOfFirstBuild = -1.0;
+ this.cuboidRowEstimatesHLL = null;
+ this.sourceRowCount = -1L;
+ }
}
}
- /**
- * Read statistics from
- * @param path
- * rather than
- * @param cubeSegment
- *
- * Since the statistics are from
- * @param path
- * cuboid scheduler should be provided by default
- */
- public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler
cuboidScheduler, KylinConfig kylinConfig, Path path)
- throws IOException {
- CubeStatsResult cubeStatsResult = new CubeStatsResult(path,
kylinConfig.getCubeStatsHLLPrecision());
+ public Map<Long, Long> getPreciseCuboidsRowsMap() {
+ return this.seg.getCuboidStaticsRows();
+ }
- this.seg = cubeSegment;
- this.cuboidScheduler = cuboidScheduler;
- this.samplingPercentage = cubeStatsResult.getPercentage();
- this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber();
- this.mapperOverlapRatioOfFirstBuild =
cubeStatsResult.getMapperOverlapRatio();
- this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap();
- this.sourceRowCount = cubeStatsResult.getSourceRecordCount();
+ public Map<Long, Double> getPreciseCuboidsSizeMap() {
+ return handlePreciseCuboidsSize(this.seg.getCuboidStaticsSize());
+ }
+
+ private Map<Long, Double> handlePreciseCuboidsSize(Map<Long, Long>
cuboidSizeMap) {
+ Map<Long, Double> sizeMap = Maps.newHashMap();
+ for (Map.Entry<Long, Long> entry: cuboidSizeMap.entrySet()) {
+ sizeMap.put(entry.getKey(), 1.0 * entry.getValue()/(1024L *
1024L));
+ }
+ return sizeMap;
}
private File writeTmpSeqFile(InputStream inputStream) throws IOException {
@@ -320,13 +328,9 @@ public class CubeStatsReader {
sizeMap.put(cuboidId, oriValue * rate);
}
}
-
logger.info("cube size is {} after optimize",
SumHelper.sumDouble(sizeMap.values()));
-
- return;
}
-
/**
* Estimate the cuboid's size
*
@@ -353,8 +357,9 @@ public class CubeStatsReader {
double percentileSpace = 0;
int topNSpace = 0;
for (MeasureDesc measureDesc :
cubeSegment.getCubeDesc().getMeasures()) {
- if (rowCount == 0)
+ if (rowCount == 0) {
break;
+ }
DataType returnType =
measureDesc.getFunction().getReturnDataType();
if
(measureDesc.getFunction().getExpression().equals(FunctionDesc.FUNC_COUNT_DISTINCT))
{
long estimateDistinctCount = sourceRowCount / rowCount;
@@ -381,19 +386,29 @@ public class CubeStatsReader {
}
private void print(PrintWriter out) {
- Map<Long, Long> cuboidRows = getCuboidRowEstimatesHLL();
- Map<Long, Double> cuboidSizes = getCuboidSizeMap();
+ Map<Long, Long> cuboidRows;
+ Map<Long, Double> cuboidSizes;
+ if (isPrecise) {
+ cuboidRows = getPreciseCuboidsRowsMap();
+ cuboidSizes = getPreciseCuboidsSizeMap();
+ } else {
+ cuboidRows = getCuboidRowEstimatesHLL();
+ cuboidSizes = getCuboidSizeMap();
+ }
List<Long> cuboids = new ArrayList<Long>(cuboidRows.keySet());
Collections.sort(cuboids);
+ String estimatedOrPrecise = isPrecise? "precise" : "estimated";
out.println("============================================================================");
out.println("Statistics of " + seg);
out.println();
- out.println(
- "Cube statistics hll precision: " +
cuboidRowEstimatesHLL.values().iterator().next().getPrecision());
+ if (!isPrecise) {
+ out.println("Cube statistics hll precision: "
+ +
cuboidRowEstimatesHLL.values().iterator().next().getPrecision());
+ }
out.println("Total cuboids: " + cuboidRows.size());
- out.println("Total estimated rows: " +
SumHelper.sumLong(cuboidRows.values()));
- out.println("Total estimated size(MB): " +
SumHelper.sumDouble(cuboidSizes.values()));
+ out.println("Total " + estimatedOrPrecise + " rows: " +
SumHelper.sumLong(cuboidRows.values()));
+ out.println("Total " + estimatedOrPrecise + " size(MB): " +
SumHelper.sumDouble(cuboidSizes.values()));
out.println("Sampling percentage: " + samplingPercentage);
out.println("Mapper overlap ratio: " + mapperOverlapRatioOfFirstBuild);
out.println("Mapper number: " + mapperNumberOfFirstBuild);
@@ -419,21 +434,14 @@ public class CubeStatsReader {
return ret;
}
- public List<Long> getCuboidsByLayer(int level) {
- if (cuboidScheduler == null) {
- throw new UnsupportedOperationException("cuboid scheduler is
null");
- }
- List<List<Long>> layeredCuboids = cuboidScheduler.getCuboidsByLayer();
- return layeredCuboids.get(level);
- }
-
private void printCuboidInfoTreeEntry(Map<Long, Long> cuboidRows,
Map<Long, Double> cuboidSizes, PrintWriter out) {
if (cuboidScheduler == null) {
throw new UnsupportedOperationException("cuboid scheduler is
null");
}
long baseCuboid = Cuboid.getBaseCuboidId(seg.getCubeDesc());
int dimensionCount = Long.bitCount(baseCuboid);
- printCuboidInfoTree(-1L, baseCuboid, cuboidScheduler, cuboidRows,
cuboidSizes, dimensionCount, 0, out);
+ printCuboidInfoTree(-1L, baseCuboid, cuboidScheduler, cuboidRows,
cuboidSizes, dimensionCount, 0, out,
+ this.isPrecise);
}
private void printKVInfo(PrintWriter writer) {
@@ -445,19 +453,21 @@ public class CubeStatsReader {
}
private static void printCuboidInfoTree(long parent, long cuboidID, final
CuboidScheduler scheduler,
- Map<Long, Long> cuboidRows, Map<Long, Double> cuboidSizes, int
dimensionCount, int depth, PrintWriter out) {
- printOneCuboidInfo(parent, cuboidID, cuboidRows, cuboidSizes,
dimensionCount, depth, out);
+ Map<Long, Long> cuboidRows, Map<Long, Double> cuboidSizes, int
dimensionCount, int depth, PrintWriter out,
+ boolean isPrecise) {
+ printOneCuboidInfo(parent, cuboidID, cuboidRows, cuboidSizes,
dimensionCount, depth, out, isPrecise);
List<Long> children = scheduler.getSpanningCuboid(cuboidID);
Collections.sort(children);
for (Long child : children) {
- printCuboidInfoTree(cuboidID, child, scheduler, cuboidRows,
cuboidSizes, dimensionCount, depth + 1, out);
+ printCuboidInfoTree(cuboidID, child, scheduler, cuboidRows,
cuboidSizes, dimensionCount, depth + 1,
+ out, isPrecise);
}
}
private static void printOneCuboidInfo(long parent, long cuboidID,
Map<Long, Long> cuboidRows,
- Map<Long, Double> cuboidSizes, int dimensionCount, int depth,
PrintWriter out) {
+ Map<Long, Double> cuboidSizes, int dimensionCount, int depth,
PrintWriter out, boolean isPrecise) {
StringBuffer sb = new StringBuffer();
for (int i = 0; i < depth; i++) {
sb.append(" ");
@@ -465,16 +475,21 @@ public class CubeStatsReader {
String cuboidName = Cuboid.getDisplayName(cuboidID, dimensionCount);
sb.append("|---- Cuboid ").append(cuboidName);
- long rowCount = cuboidRows.get(cuboidID);
- double size = cuboidSizes.get(cuboidID);
- sb.append(", est row: ").append(rowCount).append(", est MB:
").append(formatDouble(size));
+ long rowCount = cuboidRows.get(cuboidID) == null ? 0:
cuboidRows.get(cuboidID);
+ double size = cuboidSizes.get(cuboidID)== null ? 0.0:
cuboidSizes.get(cuboidID);
+ String markPreciseOrEstimate = isPrecise ? "precise" : "est";
+ sb.append(", ").append(markPreciseOrEstimate).append(" row:
").append(rowCount)
+ .append(", ").append(markPreciseOrEstimate).append(" MB: ")
+ .append(formatDouble(size));
if (parent != -1) {
- sb.append(", shrink: ").append(formatDouble(100.0 *
cuboidRows.get(cuboidID) / cuboidRows.get(parent)))
- .append("%");
+ double shrink = -0.0;
+ if (cuboidRows.get(parent) != null) {
+ shrink = 100.0 * cuboidRows.get(cuboidID) /
cuboidRows.get(parent);
+ }
+ sb.append(", shrink: ").append(formatDouble(shrink)).append("%");
}
-
- out.println(sb.toString());
+ out.println(sb);
}
private static String formatDouble(double input) {
@@ -544,7 +559,7 @@ public class CubeStatsReader {
new BufferedWriter(new OutputStreamWriter(System.out,
StandardCharsets.UTF_8)));
for (CubeSegment seg : segments) {
try {
- new CubeStatsReader(seg, config).print(out);
+ new CubeStatsReader(seg, config, false).print(out);
} catch (Exception e) {
logger.info("CubeStatsReader for Segment {} failed, skip it.",
seg.getName());
}
diff --git
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
index 71d2fd1a6a..f989a68258 100644
---
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
+++
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
@@ -43,7 +43,7 @@ public class CuboidRecommenderUtil {
return null;
}
- CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, null,
segment.getConfig());
+ CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, null,
segment.getConfig(), true);
if (cubeStatsReader.getCuboidRowEstimatesHLL() == null
|| cubeStatsReader.getCuboidRowEstimatesHLL().isEmpty()) {
logger.info("Cuboid Statistics is not enabled.");
@@ -115,7 +115,7 @@ public class CuboidRecommenderUtil {
return null;
}
- CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, null,
segment.getConfig());
+ CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, null,
segment.getConfig(), true);
if (cubeStatsReader.getCuboidRowEstimatesHLL() == null
|| cubeStatsReader.getCuboidRowEstimatesHLL().isEmpty()) {
logger.info("Cuboid Statistics is not enabled.");
diff --git
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
index a5fbe2b75b..9cf50f7c1c 100644
---
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
+++
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
@@ -74,7 +74,7 @@ public class CuboidStatsReaderUtil {
List<CubeSegment> segmentList =
cube.getSegments(SegmentStatusEnum.READY);
Map<Long, Double> sizeMerged =
Maps.newHashMapWithExpectedSize(statistics.size());
for (CubeSegment pSegment : segmentList) {
- CubeStatsReader pReader = new CubeStatsReader(pSegment, null,
pSegment.getConfig());
+ CubeStatsReader pReader = new CubeStatsReader(pSegment, null,
pSegment.getConfig(), true);
Map<Long, Double> pSizeMap =
CubeStatsReader.getCuboidSizeMapFromRowCount(pSegment, statistics,
pReader.sourceRowCount);
for (Long pCuboid : statistics.keySet()) {
@@ -102,7 +102,7 @@ public class CuboidStatsReaderUtil {
Map<Long, HLLCounter> cuboidHLLMapMerged =
Maps.newHashMapWithExpectedSize(cuboidSet.size());
Map<Long, Double> sizeMapMerged =
Maps.newHashMapWithExpectedSize(cuboidSet.size());
for (CubeSegment pSegment : segmentList) {
- CubeStatsReader pReader = new CubeStatsReader(pSegment, null,
pSegment.getConfig());
+ CubeStatsReader pReader = new CubeStatsReader(pSegment, null,
pSegment.getConfig(), true);
Map<Long, HLLCounter> pHLLMap = pReader.getCuboidRowHLLCounters();
if (pHLLMap == null || pHLLMap.isEmpty()) {
logger.info("Cuboid Statistics for segment " +
pSegment.getName() + " is not enabled.");
@@ -147,7 +147,7 @@ public class CuboidStatsReaderUtil {
return null;
}
- CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment,
null, cubeSegment.getConfig());
+ CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment,
null, cubeSegment.getConfig(), true);
if (cubeStatsReader.getCuboidRowEstimatesHLL() == null
|| cubeStatsReader.getCuboidRowEstimatesHLL().isEmpty()) {
logger.info("Cuboid Statistics is not enabled.");
diff --git
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
index ecde4aa7ba..9a6a550682 100644
---
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
+++
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -67,7 +67,7 @@ public class MapReduceUtil {
logger.info("Having per reduce MB " + perReduceInputMB + ", reduce
count ratio " + reduceCountRatio + ", level "
+ level);
- CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment,
cuboidScheduler, kylinConfig);
+ CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment,
cuboidScheduler, kylinConfig, true);
double parentLayerSizeEst, currentLayerSizeEst,
adjustedCurrentLayerSizeEst;
@@ -111,7 +111,7 @@ public class MapReduceUtil {
throws IOException {
KylinConfig kylinConfig = cubeSeg.getConfig();
- Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg,
cuboidScheduler, kylinConfig).getCuboidSizeMap();
+ Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg,
cuboidScheduler, kylinConfig, true).getCuboidSizeMap();
double totalSizeInM = 0;
for (Double cuboidSize : cubeSizeMap.values()) {
totalSizeInM += cuboidSize;
diff --git
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
index dc96c56c4a..3974b652a8 100644
---
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
+++
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
@@ -19,11 +19,11 @@
package org.apache.kylin.engine.mr.common;
import java.io.IOException;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import com.google.common.collect.Maps;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -39,7 +39,7 @@ public class StatisticsDecisionUtil {
protected static final Logger logger =
LoggerFactory.getLogger(StatisticsDecisionUtil.class);
public static void decideCubingAlgorithm(CubingJob cubingJob, CubeSegment
seg) throws IOException {
- CubeStatsReader cubeStats = new CubeStatsReader(seg, null,
seg.getConfig());
+ CubeStatsReader cubeStats = new CubeStatsReader(seg, null,
seg.getConfig(), true);
decideCubingAlgorithm(cubingJob, seg,
cubeStats.getMapperOverlapRatioOfFirstBuild(),
cubeStats.getMapperNumberOfFirstBuild());
}
@@ -95,15 +95,14 @@ public class StatisticsDecisionUtil {
// For triggering cube planner phase one
public static Map<Long, Long> optimizeCubingPlan(CubeSegment segment)
throws IOException {
- if (isAbleToOptimizeCubingPlan(segment)) {
- logger.info("It's able to trigger cuboid planner algorithm.");
- } else {
- return new HashMap<>();
+ if (!isAbleToOptimizeCubingPlan(segment)) {
+ return Maps.newHashMap();
}
+ logger.info("It's able to trigger cuboid planner algorithm.");
Map<Long, Long> recommendCuboidsWithStats =
CuboidRecommenderUtil.getRecommendCuboidList(segment);
if (recommendCuboidsWithStats == null ||
recommendCuboidsWithStats.isEmpty()) {
- return new HashMap<>();
+ return Maps.newHashMap();
}
CubeInstance cube = segment.getCubeInstance();
@@ -115,8 +114,9 @@ public class StatisticsDecisionUtil {
public static boolean isAbleToOptimizeCubingPlan(CubeSegment segment) {
CubeInstance cube = segment.getCubeInstance();
- if (!cube.getConfig().isCubePlannerEnabled())
+ if (!cube.getConfig().isCubePlannerEnabled()) {
return false;
+ }
if (cube.getSegments(SegmentStatusEnum.READY_PENDING).size() > 0) {
logger.info("Has read pending segments and will not enable cube
planner.");
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 838c8233df..d24ecd480c 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -371,16 +371,18 @@ public class CubeManager implements IRealizationProvider {
}
private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry,
boolean isLocal) throws IOException {
- if (update == null || update.getCubeInstance() == null)
+ if (update == null || update.getCubeInstance() == null) {
throw new IllegalStateException();
+ }
CubeInstance cube = update.getCubeInstance();
logger.info("Updating cube instance '{}'", cube.getName());
Segments<CubeSegment> newSegs = (Segments) cube.getSegments().clone();
- if (update.getToAddSegs() != null)
+ if (update.getToAddSegs() != null) {
newSegs.addAll(Arrays.asList(update.getToAddSegs()));
+ }
List<String> toRemoveResources = Lists.newArrayList();
if (update.getToRemoveSegs() != null) {
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 6f54e1eb98..b6d134a079 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -18,7 +18,9 @@
package org.apache.kylin.cube;
+import java.io.IOException;
import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Collection;
@@ -30,9 +32,12 @@ import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
+import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.annotation.Clarification;
import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.CompressionUtils;
+import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
@@ -139,6 +144,12 @@ public class CubeSegment implements IBuildable, ISegment,
Serializable {
@JsonInclude(JsonInclude.Include.NON_EMPTY)
private Map<String, DimensionRangeInfo> dimensionRangeInfoMap =
Maps.newHashMap();
+ @JsonProperty("cuboid_statics_rows_bytes")
+ private byte[] cuboidStaticsRowsBytes;
+
+ @JsonProperty("cuboid_statics_size_bytes")
+ private byte[] cuboidStaticsSizeBytes;
+
private Map<Long, Short> cuboidBaseShards = Maps.newConcurrentMap(); //
cuboid id ==> base(starting) shard for this cuboid
// lazy init
@@ -690,6 +701,53 @@ public class CubeSegment implements IBuildable, ISegment,
Serializable {
copy.dimensionRangeInfoMap = other.dimensionRangeInfoMap == null ? null
: Maps.newHashMap(other.dimensionRangeInfoMap);
copy.binarySignature = other.binarySignature;
+ copy.cuboidStaticsRowsBytes = other.cuboidStaticsRowsBytes;
+ copy.cuboidStaticsSizeBytes = other.cuboidStaticsSizeBytes;
return copy;
}
+
+ public void setCuboidStaticsRowsBytes(Map<Long, Long> staticsOfRows) {
+ this.cuboidStaticsRowsBytes = compressedCuboids(staticsOfRows);
+ }
+
+ public void setCuboidStaticsSizeBytes(Map<Long, Long> staticsOfSize) {
+ this.cuboidStaticsSizeBytes = compressedCuboids(staticsOfSize);
+ }
+
+ public Map<Long, Long> getCuboidStaticsRows() {
+ return this.decompressCuboids(this.cuboidStaticsRowsBytes);
+ }
+
+ public Map<Long, Long> getCuboidStaticsSize() {
+ return this.decompressCuboids(this.cuboidStaticsSizeBytes);
+ }
+
+ private Map<Long, Long> decompressCuboids(byte[] cuboidStaticsBytes) {
+ if (cuboidStaticsBytes == null) {
+ return null;
+ }
+ byte[] uncompressed;
+ try {
+ uncompressed = CompressionUtils.decompress(cuboidStaticsBytes);
+ String str = new String(uncompressed, StandardCharsets.UTF_8);
+ TypeReference<Map<Long, Long>> typeRef = new
TypeReference<Map<Long, Long>>() {};
+ Map<Long, Long> cuboids = JsonUtil.readValue(str, typeRef);
+ return cuboids.isEmpty() ? null : cuboids;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private byte[] compressedCuboids(Map<Long, Long> cuboids) {
+ if (cuboids == null || cuboids.isEmpty()) {
+ return null;
+ }
+
+ try {
+ String str = JsonUtil.writeValueAsString(cuboids);
+ return
CompressionUtils.compress(str.getBytes(StandardCharsets.UTF_8));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/DefaultCuboidScheduler.java
b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/DefaultCuboidScheduler.java
index a5585f6d89..07e2030f7f 100644
---
a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/DefaultCuboidScheduler.java
+++
b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/DefaultCuboidScheduler.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -289,6 +289,7 @@ public class DefaultCuboidScheduler extends CuboidScheduler
{
* @param agg agg group
* @return cuboidId list
*/
+ @Override
public Set<Long> calculateCuboidsForAggGroup(AggregationGroup agg) {
Set<Long> cuboidHolder = new HashSet<>();
diff --git
a/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java
b/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java
index 64624cad89..7d71c34388 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -83,10 +83,10 @@ public class CubeDimEncMap implements
IDimensionEncodingMap, java.io.Serializabl
@Override
public Dictionary<String> getDictionary(TblColRef col) {
-// if (seg == null)
+ if (dictionaryMap == null) {
+ return null;
+ }
return dictionaryMap.get(col);
-// else
-// return seg.getDictionary(col);
}
}
diff --git
a/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorLeaderSelector.java
b/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorLeaderSelector.java
index f67cf454e7..d6e9532e1e 100644
---
a/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorLeaderSelector.java
+++
b/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorLeaderSelector.java
@@ -109,8 +109,9 @@ public class CuratorLeaderSelector extends
LeaderSelectorListenerAdapter impleme
logger.error("Other exception occurred when initialization
DefaultScheduler:", th);
} finally {
logger.warn(this.name + " relinquishing leadership.");
- if (defaultScheduler != null)
+ if (defaultScheduler != null) {
defaultScheduler.shutdown();
+ }
}
}
-}
\ No newline at end of file
+}
diff --git
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java
index 1a2b7d7e76..a3c17fa71e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java
@@ -109,8 +109,9 @@ public class Segments<T extends ISegment> extends
ArrayList<T> implements Serial
for (int i = this.size() - 1; i >= 0; i--) {
T seg = this.get(i);
if (seg.getLastBuildTime() > 0) {
- if (latest == null || seg.getLastBuildTime() >
latest.getLastBuildTime())
+ if (latest == null || seg.getLastBuildTime() >
latest.getLastBuildTime()) {
latest = seg;
+ }
}
}
return latest;
@@ -277,10 +278,10 @@ public class Segments<T extends ISegment> extends
ArrayList<T> implements Serial
}
}
- if (result.size() <= 1)
+ if (result.size() <= 1) {
return null;
- else
- return (Pair<T, T>) Pair.newPair(result.getFirst(),
result.getLast());
+ }
+ return (Pair<T, T>) Pair.newPair(result.getFirst(), result.getLast());
}
/**
@@ -295,8 +296,9 @@ public class Segments<T extends ISegment> extends
ArrayList<T> implements Serial
if (newSegment != null && !tobe.contains(newSegment)) {
tobe.add(newSegment);
}
- if (tobe.size() == 0)
+ if (tobe.size() == 0) {
return tobe;
+ }
// sort by source offset
Collections.sort(tobe);
@@ -468,7 +470,7 @@ public class Segments<T extends ISegment> extends
ArrayList<T> implements Serial
public Pair<Boolean, Boolean> fitInSegments(ISegment newOne) {
if (this.isEmpty()) {
- return Pair.newPair(false, false);
+ return Pair.newPair(false, false);
}
ISegment first = this.get(0);
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java
index 8a412d5d53..a0598f818f 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java
@@ -74,12 +74,13 @@ import java.util.stream.Collectors;
public class OptimizeBuildJob extends SparkApplication {
private static final Logger logger =
LoggerFactory.getLogger(OptimizeBuildJob.class);
- private Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
protected static String TEMP_DIR_SUFFIX = "_temp";
private BuildLayoutWithUpdate buildLayoutWithUpdate;
private Map<Long, Short> cuboidShardNum = Maps.newConcurrentMap();
private Map<Long, Long> cuboidsRowCount = Maps.newConcurrentMap();
+ Map<Long, Long> cuboidIdToPreciseRows;
+ Map<Long, Long> cuboidIdToPreciseSize;
private Configuration conf = HadoopUtil.getCurrentConfiguration();
private CubeManager cubeManager;
@@ -105,6 +106,9 @@ public class OptimizeBuildJob extends SparkApplication {
optSeg = cubeInstance.getSegmentById(segmentId);
originalSeg = cubeInstance.getOriginalSegmentToOptimize(optSeg);
originalSegInfo = ManagerHub.getSegmentInfo(config, cubeId,
originalSeg.getUuid());
+ // copy the original statistics
+ cuboidIdToPreciseSize = originalSeg.getCuboidStaticsSize();
+ cuboidIdToPreciseRows = originalSeg.getCuboidStaticsRows();
calculateCuboidFromBaseCuboid();
buildCuboidFromParent(cubeId);
@@ -150,10 +154,11 @@ public class OptimizeBuildJob extends SparkApplication {
SpanningTree spanningTree;
ParentSourceChooser sourceChooser;
try {
- spanningTree = new
ForestSpanningTree(JavaConversions.asJavaCollection(optSegInfo.toBuildLayouts()));
+ Collection<LayoutEntity> cuboids =
JavaConversions.asJavaCollection(optSegInfo.toBuildLayouts());
+ spanningTree = new ForestSpanningTree(cuboids);
logger.info("There are {} cuboids to be built in segment {}.",
optSegInfo.toBuildLayouts().size(),
optSegInfo.name());
- for (LayoutEntity cuboid :
JavaConversions.asJavaCollection(optSegInfo.toBuildLayouts())) {
+ for (LayoutEntity cuboid : cuboids) {
logger.debug("Cuboid {} has row keys: {}", cuboid.getId(),
Joiner.on(",
").join(cuboid.getOrderedDimensions().keySet()));
}
@@ -205,6 +210,10 @@ public class OptimizeBuildJob extends SparkApplication {
}
optSeg.setCuboidShardNums(cuboidShardNum);
optSeg.setInputRecordsSize(originalSeg.getInputRecordsSize());
+
+ optSeg.setCuboidStaticsSizeBytes(cuboidIdToPreciseSize);
+ optSeg.setCuboidStaticsRowsBytes(cuboidIdToPreciseRows);
+
Map<String, String> additionalInfo = optSeg.getAdditionalInfo();
additionalInfo.put("storageType", "" + IStorageAware.ID_PARQUET);
optSeg.setAdditionalInfo(additionalInfo);
@@ -407,6 +416,8 @@ public class OptimizeBuildJob extends SparkApplication {
cuboidShardNum.put(layoutId, (short) shardNum);
JobMetricsUtils.unRegisterQueryExecutionListener(ss, queryExecutionId);
BuildUtils.fillCuboidInfo(layout, path);
+ cuboidIdToPreciseRows.put(layoutId, layout.getRows());
+ cuboidIdToPreciseSize.put(layoutId, layout.getByteSize());
}
private void updateExistingLayout(LayoutEntity layout, long parentId)
throws IOException {
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
index 86e256f4ac..11291b3698 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
@@ -112,8 +112,8 @@ public class UpdateMetadataUtil {
toUpdateSeg.getSnapshots().putAll(origSeg.getSnapshots());
toUpdateSeg.getRowkeyStats().addAll(origSeg.getRowkeyStats());
- CubeStatsReader optSegStatsReader = new
CubeStatsReader(toUpdateSeg, config);
- CubeStatsReader origSegStatsReader = new CubeStatsReader(origSeg,
config);
+ CubeStatsReader optSegStatsReader = new
CubeStatsReader(toUpdateSeg, config, true);
+ CubeStatsReader origSegStatsReader = new CubeStatsReader(origSeg,
config, true);
Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
if (origSegStatsReader.getCuboidRowHLLCounters() == null) {
logger.warn(
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeMergeAssist.java
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeMergeAssist.java
index c2b59e7e58..e142f73163 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeMergeAssist.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeMergeAssist.java
@@ -85,11 +85,10 @@ public class CubeMergeAssist implements Serializable {
if (mergeDataset == null) {
mergeDataset = layoutDataset;
- } else
+ } else {
mergeDataset = mergeDataset.union(layoutDataset);
+ }
}
return mergeDataset;
-
}
-
}
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
index a5f0e11403..f7b528f27c 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
@@ -89,6 +89,8 @@ public class CubeBuildJob extends SparkApplication {
private BuildLayoutWithUpdate buildLayoutWithUpdate;
private Map<Long, Short> cuboidShardNum = Maps.newConcurrentMap();
private Map<Long, Long> cuboidsRowCount = Maps.newConcurrentMap();
+ private Map<Long, Long> cuboidIdToPreciseRows = Maps.newConcurrentMap();
+ private Map<Long, Long> cuboidIdToPreciseSize = Maps.newConcurrentMap();
private Map<Long, Long> recommendCuboidMap = new HashMap<>();
public static void main(String[] args) {
@@ -124,7 +126,7 @@ public class CubeBuildJob extends SparkApplication {
long startMills = System.currentTimeMillis();
spanningTree = new
ForestSpanningTree(JavaConversions.asJavaCollection(statisticsSeg.toBuildLayouts()));
sourceChooser = new ParentSourceChooser(spanningTree,
statisticsSeg, newSegment, jobId, ss, config, false);
- sourceChooser.setNeedStatistics();
+ sourceChooser.toStatistics();
sourceChooser.decideFlatTableSource(null);
Map<Long, HLLCounter> hllMap = new HashMap<>();
for (Tuple2<Object, AggInfo> cuboidData : sourceChooser.aggInfo())
{
@@ -231,6 +233,8 @@ public class CubeBuildJob extends SparkApplication {
long diff = (layoutEntity.getRows() -
recommendCuboidMap.get(layoutEntity.getId()));
deviation = diff / (layoutEntity.getRows() + 0.0d);
}
+
+ collectPreciseStatics(cuboidIdToPreciseRows,
cuboidIdToPreciseSize, layoutEntity);
cuboidStatics.add(String.format(Locale.getDefault(), template,
layoutEntity.getId(),
layoutEntity.getRows(), layoutEntity.getByteSize(),
deviation));
}
@@ -258,6 +262,8 @@ public class CubeBuildJob extends SparkApplication {
segment.setInputRecords(sourceRowCount);
segment.setSnapshots(new
ConcurrentHashMap<>(segmentInfo.getSnapShot2JavaMap()));
segment.setCuboidShardNums(cuboidShardNum);
+ segment.setCuboidStaticsRowsBytes(cuboidIdToPreciseRows);
+ segment.setCuboidStaticsSizeBytes(cuboidIdToPreciseSize);
Map<String, String> additionalInfo = segment.getAdditionalInfo();
additionalInfo.put("storageType", "" + IStorageAware.ID_PARQUET);
segment.setAdditionalInfo(additionalInfo);
@@ -266,6 +272,13 @@ public class CubeBuildJob extends SparkApplication {
cubeManager.updateCube(update, true);
}
+ private void collectPreciseStatics(Map<Long, Long> cuboidIdToPreciseRows,
Map<Long, Long> cuboidIdToPreciseSize,
+ LayoutEntity layoutEntity) {
+ // Hold the precise statics of cuboid in segment
+ cuboidIdToPreciseRows.put(layoutEntity.getId(),
layoutEntity.getRows());
+ cuboidIdToPreciseSize.put(layoutEntity.getId(),
layoutEntity.getByteSize());
+ }
+
private void collectPersistedTablePath(List<String> persistedFlatTable,
ParentSourceChooser sourceChooser) {
String flatTablePath = sourceChooser.persistFlatTableIfNecessary();
if (!flatTablePath.isEmpty()) {
@@ -279,8 +292,9 @@ public class CubeBuildJob extends SparkApplication {
CubeInstance cubeCopy = cubeInstance.latestCopyForWrite();
CubeUpdate update = new CubeUpdate(cubeCopy);
- if (recommendCuboidMap != null && !recommendCuboidMap.isEmpty())
+ if (recommendCuboidMap != null && !recommendCuboidMap.isEmpty()) {
update.setCuboids(recommendCuboidMap);
+ }
List<CubeSegment> cubeSegments = Lists.newArrayList();
for (Map.Entry<String, Object> entry :
toUpdateSegmentSourceSize.entrySet()) {
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
index 16fecd385c..6d7e10e1df 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
@@ -61,6 +61,8 @@ public class CubeMergeJob extends SparkApplication {
private List<CubeSegment> mergingSegments = Lists.newArrayList();
private List<SegmentInfo> mergingSegInfos = Lists.newArrayList();
private Map<Long, Short> cuboidShardNum = Maps.newConcurrentMap();
+ Map<Long, Long> cuboidIdToPreciseRows = Maps.newConcurrentMap();
+ Map<Long, Long> cuboidIdToPreciseSize = Maps.newConcurrentMap();
@Override
protected void doExecute() throws Exception {
@@ -134,7 +136,6 @@ public class CubeMergeJob extends SparkApplication {
for (int i = 0; i < cuboids.size(); i++) {
LayoutEntity cuboid = cuboids.apply(i);
long layoutId = cuboid.getId();
-
CubeMergeAssist assist = mergeCuboidsAssist.get(layoutId);
if (assist == null) {
assist = new CubeMergeAssist();
@@ -191,7 +192,8 @@ public class CubeMergeJob extends SparkApplication {
JobMetricsUtils.registerQueryExecutionListener(ss, queryExecutionId);
BuildUtils.fillCuboidInfo(layout, path);
-
+ cuboidIdToPreciseSize.put(layoutId, layout.getByteSize());
+ cuboidIdToPreciseRows.put(layoutId, layout.getRows());
return layout;
}
@@ -202,9 +204,9 @@ public class CubeMergeJob extends SparkApplication {
List<CubeSegment> cubeSegments = Lists.newArrayList();
CubeSegment segment = cubeCopy.getSegmentById(segmentId);
- long totalSourceSize = 0l;
- long totalInputRecords = 0l;
- long totalInputRecordsSize = 0l;
+ long totalSourceSize = 0L;
+ long totalInputRecords = 0L;
+ long totalInputRecordsSize = 0L;
for (CubeMergeAssist assist : mergeCuboidsAssist.values()) {
totalSourceSize += assist.getLayout().getByteSize();
}
@@ -221,6 +223,8 @@ public class CubeMergeJob extends SparkApplication {
Map<String, String> additionalInfo = segment.getAdditionalInfo();
additionalInfo.put("storageType", "" + IStorageAware.ID_PARQUET);
segment.setAdditionalInfo(additionalInfo);
+ segment.setCuboidStaticsRowsBytes(cuboidIdToPreciseRows);
+ segment.setCuboidStaticsSizeBytes(cuboidIdToPreciseSize);
cubeSegments.add(segment);
update.setToUpdateSegs(cubeSegments.toArray(new CubeSegment[0]));
cubeManager.updateCube(update, true);
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
index 0f77c768a9..a82a82144d 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
@@ -50,15 +50,20 @@ class ParentSourceChooser(
// build from flatTable.
var flatTableSource: NBuildSourceInfo = _
- private var needStatistics = false
+ private var needStatistics: Boolean = false
//TODO: MetadataConverter don't have getCubeDesc() now
/*val flatTableDesc = new CubeJoinedFlatTableDesc(
MetadataConverter.getCubeDesc(segInfo.getCube),
ParentSourceChooser.needJoinLookupTables(segInfo.getModel, toBuildTree))*/
- def setNeedStatistics(): Unit =
- needStatistics = true
+ def toStatistics(): Unit = {
+ this.needStatistics = true
+ }
+
+ def cancelStatistics(): Unit = {
+ this.needStatistics = false
+ }
def getAggInfo : Array[(Long, AggInfo)] = aggInfo
diff --git
a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NOptimizeJobTest.java
b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NOptimizeJobTest.java
index 5e5789c3eb..6fdea85947 100644
---
a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NOptimizeJobTest.java
+++
b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NOptimizeJobTest.java
@@ -110,7 +110,7 @@ public class NOptimizeJobTest extends
LocalWithSparkSessionTest {
FileSystem fs = HadoopUtil.getWorkingFileSystem();
for (CubeSegment segment : cube.getSegments()) {
Assert.assertEquals(SegmentStatusEnum.READY, segment.getStatus());
- CubeStatsReader segStatsReader = new CubeStatsReader(segment,
config);
+ CubeStatsReader segStatsReader = new CubeStatsReader(segment,
config, true);
Assert.assertEquals(recommendCuboids,
segStatsReader.getCuboidRowHLLCounters().keySet());
String cuboidPath = PathManager.getSegmentParquetStoragePath(cube,
segment.getName(), segment.getStorageLocationIdentifier());
Assert.assertTrue(fs.exists(new Path(cuboidPath)));