Repository: kylin Updated Branches: refs/heads/master 2edee4915 -> e21fe2acb
KYLIN-2802 Enable cube planner phase one Signed-off-by: Li Yang <liy...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f0bc691e Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f0bc691e Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f0bc691e Branch: refs/heads/master Commit: f0bc691ecff2a8f92a3a43e49c5c7b1b676d6d10 Parents: 7b6606a Author: Zhong <nju_y...@apache.org> Authored: Fri Sep 15 13:27:27 2017 +0800 Committer: Li Yang <liy...@apache.org> Committed: Sat Sep 23 18:16:48 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 27 +++ .../org/apache/kylin/common/util/JsonUtil.java | 3 +- .../org/apache/kylin/cube/CubeInstance.java | 39 ++++- .../java/org/apache/kylin/cube/CubeManager.java | 4 + .../java/org/apache/kylin/cube/CubeUpdate.java | 11 ++ .../cuboid/algorithm/CuboidRecommender.java | 163 +++++++++++++++++++ .../cube/cuboid/algorithm/CuboidStats.java | 27 ++- .../cube/cuboid/algorithm/CuboidStatsUtil.java | 134 +++++++++------ .../org/apache/kylin/cube/CubeInstanceTest.java | 57 +++++++ .../cuboid/algorithm/CuboidStatsUtilTest.java | 45 +++-- .../src/test/resources/learn_kylin_cube2.json | 84 ++++++++++ .../kylin/engine/mr/common/CubeStatsReader.java | 4 + .../engine/mr/common/CuboidRecommenderUtil.java | 106 ++++++++++++ .../engine/mr/common/CuboidStatsReaderUtil.java | 135 +++++++++++++++ .../mr/common/StatisticsDecisionUtil.java | 110 +++++++++++++ .../engine/mr/steps/SaveStatisticsStep.java | 55 +------ 16 files changed, 880 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/f0bc691e/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index d535b7d..eb2adab 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1224,6 +1224,33 @@ abstract public class KylinConfigBase implements Serializable { } // ============================================================================ + // Cube Planner + // ============================================================================ + public boolean isCubePlannerEnabledForExistingCube() { + return Boolean.parseBoolean(getOptional("kylin.cube.cubeplanner.enabled-for-existing-cube", "false")); + } + + public double getCubePlannerExpansionRateThreshold() { + return Double.parseDouble(getOptional("kylin.cube.cubeplanner.expansion-threshold", "15.0")); + } + + public int getCubePlannerRecommendCuboidCacheMaxSize() { + return Integer.parseInt(getOptional("kylin.cube.cubeplanner.recommend-cache-max-size", "200")); + } + + public long getCubePlannerMandatoryRollUpThreshold() { + return Long.parseLong(getOptional("kylin.cube.cubeplanner.mandatory-rollup-threshold", "1000")); + } + + public int getCubePlannerAgreedyAlgorithmAutoThreshold() { + return Integer.parseInt(getOptional("kylin.cube.cubeplanner.algorithm-threshold-greedy", "10")); + } + + public int getCubePlannerGeneticAlgorithmAutoThreshold() { + return Integer.parseInt(getOptional("kylin.cube.cubeplanner.algorithm-threshold-genetic", "23")); + } + + // ============================================================================ // RESTCLIENT // ============================================================================ http://git-wip-us.apache.org/repos/asf/kylin/blob/f0bc691e/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java index 5a81463..01fd134 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java @@ -72,7 +72,8 @@ public class JsonUtil { return mapper.readValue(src, valueType); } - public static <T> T readValue(String content, TypeReference<T> valueTypeRef) throws IOException { + public static <T> T readValue(String content, TypeReference<T> valueTypeRef) + throws IOException, JsonParseException, JsonMappingException { return mapper.readValue(content, valueTypeRef); } http://git-wip-us.apache.org/repos/asf/kylin/blob/f0bc691e/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java index 7d539c6..a611b96 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java @@ -19,7 +19,6 @@ package org.apache.kylin.cube; import java.io.IOException; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -30,6 +29,7 @@ import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.common.util.CompressionUtils; import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.cube.cuboid.CuboidModeEnum; import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.cube.cuboid.TreeCuboidScheduler; import org.apache.kylin.cube.model.CubeDesc; @@ -328,7 +328,36 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, this.createTimeUTC = createTimeUTC; } - Map<Long, Long> getCuboids() { + public Set<Long> getCuboidsByMode(String cuboidModeName) { + return getCuboidsByMode(CuboidModeEnum.getByModeName(cuboidModeName)); + } + + public Set<Long> getCuboidsByMode(CuboidModeEnum cuboidMode) { + if (cuboidMode == null || cuboidMode == CuboidModeEnum.CURRENT) { + return getCuboidScheduler().getAllCuboidIds(); + } + Set<Long> cuboidsRecommend = getCuboidsRecommend(); + if (cuboidsRecommend == null || cuboidMode == CuboidModeEnum.RECOMMEND) { + return cuboidsRecommend; + } + Set<Long> currentCuboids = getCuboidScheduler().getAllCuboidIds(); + switch (cuboidMode) { + case RECOMMEND_EXISTING: + cuboidsRecommend.retainAll(currentCuboids); + return cuboidsRecommend; + case RECOMMEND_MISSING: + cuboidsRecommend.removeAll(currentCuboids); + return cuboidsRecommend; + case RECOMMEND_MISSING_WITH_BASE: + cuboidsRecommend.removeAll(currentCuboids); + currentCuboids.add(getCuboidScheduler().getBaseCuboidId()); + return cuboidsRecommend; + default: + return null; + } + } + + public Map<Long, Long> getCuboids() { if (cuboidBytes == null) return null; byte[] uncompressed; @@ -344,7 +373,7 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, } } - void setCuboids(Map<Long, Long> cuboids) { + public void setCuboids(Map<Long, Long> cuboids) { if (cuboids == null) return; if (cuboids.isEmpty()) { @@ -361,7 +390,7 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, } } - Set<Long> getCuboidsRecommend() { + public Set<Long> getCuboidsRecommend() { if (cuboidBytesRecommend == null) return null; byte[] uncompressed; @@ -377,7 +406,7 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, } } - void setCuboidsRecommend(HashSet<Long> cuboids) { + public void setCuboidsRecommend(Set<Long> cuboids) { if (cuboids == null) return; if (cuboids.isEmpty()) { http://git-wip-us.apache.org/repos/asf/kylin/blob/f0bc691e/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 425e32f..d4d88fd 100755 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -418,6 +418,10 @@ public class CubeManager implements IRealizationProvider { cube.setCost(update.getCost()); } + if(update.getCuboids() != null){ + cube.setCuboids(update.getCuboids()); + } + try { getStore().putResource(cube.getResourcePath(), cube, CUBE_SERIALIZER); } catch (IllegalStateException ise) { http://git-wip-us.apache.org/repos/asf/kylin/blob/f0bc691e/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java index dffaa48..fae20dc 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java @@ -18,6 +18,8 @@ package org.apache.kylin.cube; +import java.util.Map; + import org.apache.kylin.metadata.realization.RealizationStatusEnum; /** @@ -31,6 +33,7 @@ public class CubeUpdate { private RealizationStatusEnum status; private String owner; private int cost = -1; + private Map<Long, Long> cuboids = null; public CubeUpdate(CubeInstance cubeInstance) { this.cubeInstance = cubeInstance; @@ -98,4 +101,12 @@ public class CubeUpdate { this.cost = cost; return this; } + + public Map<Long, Long> getCuboids() { + return cuboids; + } + + public void setCuboids(Map<Long, Long> cuboids) { + this.cuboids = cuboids; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/f0bc691e/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java new file mode 100644 index 0000000..43b2318 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.cube.cuboid.algorithm; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.cuboid.algorithm.generic.GeneticAlgorithm; +import org.apache.kylin.cube.cuboid.algorithm.greedy.GreedyAlgorithm; +import org.apache.kylin.metadata.cachesync.Broadcaster; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.common.collect.Maps; + +public class CuboidRecommender { + private static final Logger logger = LoggerFactory.getLogger(CuboidRecommender.class); + + private static Cache<String, Map<Long, Long>> cuboidRecommendCache = CacheBuilder.newBuilder() + .removalListener(new RemovalListener<String, Map<Long, Long>>() { + @Override + public void onRemoval(RemovalNotification<String, Map<Long, Long>> notification) { + logger.info("Dict with resource path " + notification.getKey() + " is removed due to " + + notification.getCause()); + } + }).maximumSize(KylinConfig.getInstanceFromEnv().getCubePlannerRecommendCuboidCacheMaxSize()) + .expireAfterWrite(1, TimeUnit.DAYS).build(); + + private class CuboidRecommenderSyncListener extends Broadcaster.Listener { + @Override + public void onClearAll(Broadcaster broadcaster) throws IOException { + cuboidRecommendCache.invalidateAll(); + } + + @Override + public void onEntityChange(Broadcaster broadcaster, String entity, Broadcaster.Event event, String cacheKey) + throws IOException { + cuboidRecommendCache.invalidate(cacheKey); + } + } + + public CuboidRecommender() { + Broadcaster.getInstance(KylinConfig.getInstanceFromEnv()).registerListener(new CuboidRecommenderSyncListener(), + "cube"); + } + + private static CuboidRecommender instance = new CuboidRecommender(); + + public static CuboidRecommender getInstance() { + return instance; + } + + public Map<Long, Long> getRecommendCuboidList(final CuboidStats cuboidStats, final KylinConfig kylinConfig) { + if (cuboidStats == null) { + return null; + } + final String key = cuboidStats.getKey(); + Map<Long, Long> results = cuboidRecommendCache.getIfPresent(key); + if (results == null) { + try { + results = cuboidRecommendCache.get(key, new Callable<Map<Long, Long>>() { + @Override + public Map<Long, Long> call() throws Exception { + // recommending flag + Map<Long, Long> emptyMap = Maps.newHashMap(); + cuboidRecommendCache.put(key, emptyMap); + try { + Map<Long, Long> recommendCuboid = getRecommendCuboidList(cuboidStats, kylinConfig, + true); + + if (recommendCuboid != null) { + logger.info("Add recommend cuboids for " + key + " to cache"); + cuboidRecommendCache.put(key, recommendCuboid); + } + + return recommendCuboid; + } catch (Exception e) { + cuboidRecommendCache.invalidate(key); + logger.error("Failed to get recommend cuboids for " + key + " in cache", e); + throw e; + } + } + }); + } catch (ExecutionException e) { + logger.error("Failed to get recommend cuboids for " + key); + } + } + return results; + } + + public Map<Long, Long> getRecommendCuboidList(CuboidStats cuboidStats, KylinConfig kylinConf, + boolean ifForceRecommend) { + long Threshold1 = 1L << kylinConf.getCubePlannerAgreedyAlgorithmAutoThreshold(); + long Threshold2 = 1L << kylinConf.getCubePlannerGeneticAlgorithmAutoThreshold(); + if (Threshold1 >= Threshold2) { + logger.error("Invalid Cube Planner Algorithm configuration"); + return null; + } + + int allCuboidCount = cuboidStats.getAllCuboidsForMandatory().size() + + cuboidStats.getAllCuboidsForSelection().size(); + + BenefitPolicy benefitPolicy = new PBPUSCalculator(cuboidStats); + CuboidRecommendAlgorithm algorithm = null; + + if (allCuboidCount <= Threshold2) { + algorithm = new GreedyAlgorithm(-1, benefitPolicy, cuboidStats); + } else { + algorithm = new GeneticAlgorithm(-1, benefitPolicy, cuboidStats); + } + + long startTime = System.currentTimeMillis(); + logger.info("Cube Planner Algorithm started at " + startTime); + List<Long> recommendCuboidList = algorithm.recommend(kylinConf.getCubePlannerExpansionRateThreshold()); + logger.info("Cube Planner Algorithm ended at " + (System.currentTimeMillis() - startTime)); + + if (recommendCuboidList.size() < allCuboidCount) { + logger.info("Cube Planner Algorithm chooses " + recommendCuboidList.size() + + " most effective cuboids to build among of all " + allCuboidCount + " cuboids."); + } + + Map<Long, Long> recommendCuboidsWithStats = Maps.newLinkedHashMap(); + for (Long cuboid : recommendCuboidList) { + if (cuboid.equals(cuboidStats.getBaseCuboid())) { + recommendCuboidsWithStats.put(cuboid, cuboidStats.getCuboidCount(cuboid)); + } else if (cuboidStats.getAllCuboidsForSelection().contains(cuboid)) { + recommendCuboidsWithStats.put(cuboid, cuboidStats.getCuboidCount(cuboid)); + } else { + recommendCuboidsWithStats.put(cuboid, -1L); + } + } + + if (!ifForceRecommend && allCuboidCount <= Threshold1) { + return null; + } + return recommendCuboidsWithStats; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/f0bc691e/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStats.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStats.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStats.java index 1775d5a..a1c191e 100755 --- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStats.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStats.java @@ -18,6 +18,7 @@ package org.apache.kylin.cube.cuboid.algorithm; +import java.util.List; import java.util.Map; import java.util.Set; @@ -120,7 +121,8 @@ public class CuboidStats { private ImmutableMap<Long, Double> cuboidHitProbabilityMap; private ImmutableMap<Long, Long> cuboidScanCountMap; - private ImmutableMap<Long, Set<Long>> allDescendantsCache; + private ImmutableMap<Long, List<Long>> directChildrenCache; + private Map<Long, Set<Long>> allDescendantsCache; private CuboidStats(String key, long baseCuboidId, Set<Long> mandatoryCuboids, Map<Long, Long> statistics, Map<Long, Double> size, Map<Long, Long> hitFrequencyMap, Map<Long, Map<Long, Long>> scanCountSourceMap) { @@ -187,8 +189,10 @@ public class CuboidStats { } this.cuboidScanCountMap = ImmutableMap.<Long, Long> builder().putAll(tmpCuboidScanCountMap).build(); - this.allDescendantsCache = ImmutableMap.<Long, Set<Long>> builder() - .putAll(CuboidStatsUtil.createAllDescendantsCache(statistics.keySet())).build(); + this.directChildrenCache = ImmutableMap.<Long, List<Long>> builder() + .putAll(CuboidStatsUtil.createDirectChildrenCache(statistics.keySet())).build(); + + this.allDescendantsCache = Maps.newConcurrentMap(); } private long getExpScanCount(long sourceCuboid, Map<Long, Long> statistics, @@ -216,11 +220,26 @@ public class CuboidStats { public Set<Long> getAllDescendants(long cuboid) { Set<Long> allDescendants = Sets.newLinkedHashSet(); if (selectionCuboidSet.contains(cuboid)) { - return allDescendantsCache.get(cuboid); + if (allDescendantsCache.get(cuboid) != null) { + return allDescendantsCache.get(cuboid); + } else { + getAllDescendants(cuboid, allDescendants); + allDescendantsCache.put(cuboid, allDescendants); + } } return allDescendants; } + private void getAllDescendants(long cuboid, Set<Long> allDescendants) { + if (allDescendants.contains(cuboid)) { + return; + } + allDescendants.add(cuboid); + for (Long directChild : directChildrenCache.get(cuboid)) { + getAllDescendants(directChild, allDescendants); + } + } + public Set<Long> getAllCuboidsForSelection() { return selectionCuboidSet; } http://git-wip-us.apache.org/repos/asf/kylin/blob/f0bc691e/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStatsUtil.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStatsUtil.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStatsUtil.java index 6d5bbe5..c2683df 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStatsUtil.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStatsUtil.java @@ -20,13 +20,13 @@ package org.apache.kylin.cube.cuboid.algorithm; import java.util.Collections; import java.util.Comparator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -102,62 +102,102 @@ public class CuboidStatsUtil { } } - /** Using dynamic programming to use extra space to reduce repetitive computation*/ - public static Map<Long, Set<Long>> createAllDescendantsCache(final Set<Long> cuboidSet) { - List<Long> latticeCuboidList = Lists.newArrayList(cuboidSet); - Collections.sort(latticeCuboidList); - - Map<Long, Set<Long>> allDescendantsCache = Maps.newHashMap(); - Set<Long> preNoneDescendants = Sets.newHashSet(); - for (int i = 0; i < latticeCuboidList.size(); i++) { - Long currentCuboid = latticeCuboidList.get(i); - Set<Long> descendants = Sets.newHashSet(currentCuboid); - Set<Long> curNoneDescendants = Sets.newHashSet(); - if (i > 0) { - long preCuboid = latticeCuboidList.get(i - 1); - if (isDescendant(preCuboid, currentCuboid)) { - descendants.addAll(allDescendantsCache.get(preCuboid)); - } else { - curNoneDescendants.add(preCuboid); - for (long cuboidToCheck : allDescendantsCache.get(preCuboid)) { - if (isDescendant(cuboidToCheck, currentCuboid)) { - descendants.addAll(allDescendantsCache.get(cuboidToCheck)); - } - } - } - } - for (long cuboidToCheck : preNoneDescendants) { - if (isDescendant(cuboidToCheck, currentCuboid)) { - descendants.addAll(allDescendantsCache.get(cuboidToCheck)); - } else { - curNoneDescendants.add(cuboidToCheck); + public static Map<Long, List<Long>> createDirectChildrenCache(final Set<Long> cuboidSet) { + /** + * Sort the list by ascending order: + * */ + final List<Long> cuboidList = Lists.newArrayList(cuboidSet); + Collections.sort(cuboidList); + /** + * Sort the list by ascending order: + * 1. the more bit count of its value, the bigger + * 2. the larger of its value, the bigger + * */ + List<Integer> layerIdxList = Lists.newArrayListWithExpectedSize(cuboidList.size()); + for (int i = 0; i < cuboidList.size(); i++) { + layerIdxList.add(i); + } + Collections.sort(layerIdxList, new Comparator<Integer>() { + @Override + public int compare(Integer i1, Integer i2) { + Long o1 = cuboidList.get(i1); + Long o2 = cuboidList.get(i2); + int nBitDiff = Long.bitCount(o1) - Long.bitCount(o2); + if (nBitDiff != 0) { + return nBitDiff; } + return Long.compare(o1, o2); } + }); + /** + * Construct an index array for pointing the position in layerIdxList + * (layerCuboidList is for speeding up continuous iteration) + * */ + int[] toLayerIdxArray = new int[layerIdxList.size()]; + final List<Long> layerCuboidList = Lists.newArrayListWithExpectedSize(cuboidList.size()); + for (int i = 0; i < layerIdxList.size(); i++) { + int cuboidIdx = layerIdxList.get(i); + toLayerIdxArray[cuboidIdx] = i; + layerCuboidList.add(cuboidList.get(cuboidIdx)); + } - allDescendantsCache.put(currentCuboid, descendants); - preNoneDescendants = curNoneDescendants; + int[] previousLayerLastIdxArray = new int[layerIdxList.size()]; + int currentBitCount = 0; + int previousLayerLastIdx = -1; + for (int i = 0; i < layerIdxList.size(); i++) { + int cuboidIdx = layerIdxList.get(i); + int nBits = Long.bitCount(cuboidList.get(cuboidIdx)); + if (nBits > currentBitCount) { + currentBitCount = nBits; + previousLayerLastIdx = i - 1; + } + previousLayerLastIdxArray[i] = previousLayerLastIdx; } - return allDescendantsCache; + Map<Long, List<Long>> directChildrenCache = Maps.newHashMap(); + for (int i = 0; i < cuboidList.size(); i++) { + Long currentCuboid = cuboidList.get(i); + LinkedList<Long> directChildren = Lists.newLinkedList(); + int lastLayerIdx = previousLayerLastIdxArray[toLayerIdxArray[i]]; + /** + * Choose one of the two scan strategies + * 1. cuboids are sorted by its value, like 1,2,3,4,... + * 2. cuboids are layered and sorted, like 1,2,4,8,...,3,5,... + * */ + if (i - 1 <= lastLayerIdx) { + /** + * 1. Adding cuboid by descending order + * */ + for (int j = i - 1; j >= 0; j--) { + checkAndAddDirectChild(directChildren, currentCuboid, cuboidList.get(j)); + } + } else { + /** + * 1. Adding cuboid by descending order + * 2. Check from lower cuboid layer + * */ + for (int j = lastLayerIdx; j >= 0; j--) { + checkAndAddDirectChild(directChildren, currentCuboid, layerCuboidList.get(j)); + } + } + directChildrenCache.put(currentCuboid, directChildren); + } + return directChildrenCache; } - @VisibleForTesting - static Map<Long, Set<Long>> createAllDescendantsCache2(final Set<Long> cuboidSet) { - List<Long> latticeCuboidList = Lists.newArrayList(cuboidSet); - - Map<Long, Set<Long>> allDescendantsCache = Maps.newHashMap(); - for (int i = 0; i < latticeCuboidList.size(); i++) { - Long currentCuboid = latticeCuboidList.get(i); - Set<Long> descendantSet = Sets.newHashSet(currentCuboid); - for (int j = 0; j < i; j++) { - Long checkCuboid = latticeCuboidList.get(j); - if (isDescendant(checkCuboid, currentCuboid)) { - descendantSet.add(checkCuboid); + private static void checkAndAddDirectChild(List<Long> directChildren, Long currentCuboid, Long checkedCuboid) { + if (isDescendant(checkedCuboid, currentCuboid)) { + boolean ifDirectChild = true; + for (long directChild : directChildren) { + if (isDescendant(checkedCuboid, directChild)) { + ifDirectChild = false; + break; } } - allDescendantsCache.put(currentCuboid, descendantSet); + if (ifDirectChild) { + directChildren.add(checkedCuboid); + } } - return allDescendantsCache; } public static boolean isDescendant(long cuboidToCheck, long parentCuboid) { http://git-wip-us.apache.org/repos/asf/kylin/blob/f0bc691e/core-cube/src/test/java/org/apache/kylin/cube/CubeInstanceTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeInstanceTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeInstanceTest.java new file mode 100644 index 0000000..0af5f98 --- /dev/null +++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeInstanceTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.cube; + +import java.io.DataInputStream; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Map; + +import org.apache.kylin.common.persistence.JsonSerializer; +import org.apache.kylin.cube.cuboid.TreeCuboidScheduler; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class CubeInstanceTest { + + private CubeInstance cubeInstance; + + @Before + public void setUp() throws Exception { + InputStream fileInputStream = new FileInputStream("src/test/resources/learn_kylin_cube2.json"); + JsonSerializer<CubeInstance> jsonSerializer = new JsonSerializer<>(CubeInstance.class); + cubeInstance = jsonSerializer.deserialize(new DataInputStream(fileInputStream)); + } + + @Test + public void getTreeCuboidSchedulerTest() { + Map<Long, Long> cuboidsWithRowCnt = cubeInstance.getCuboids(); + TreeCuboidScheduler.CuboidTree.createFromCuboids(Lists.newArrayList(cuboidsWithRowCnt.keySet()), + new TreeCuboidScheduler.CuboidCostComparator(cuboidsWithRowCnt)); + } + + public void printMap(Map<Long, Long> map) { + System.out.println("size: " + map.size()); + for (Map.Entry<Long, Long> entry : map.entrySet()) { + System.out.println(entry.getKey() + ":" + entry.getValue()); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/f0bc691e/core-cube/src/test/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStatsUtilTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStatsUtilTest.java b/core-cube/src/test/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStatsUtilTest.java index ca70820..cb2b699 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStatsUtilTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStatsUtilTest.java @@ -19,14 +19,15 @@ package org.apache.kylin.cube.cuboid.algorithm; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; import org.junit.Assert; import org.junit.Test; import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -122,16 +123,16 @@ public class CuboidStatsUtilTest { } @Test - public void createAllDescendantsCacheTest() { + public void createDirectChildrenCacheTest() { Set<Long> cuboidSet = generateCuboidSet(); - Map<Long, Set<Long>> allDescendantsCache = CuboidStatsUtil.createAllDescendantsCache(cuboidSet); - - Assert.assertTrue(allDescendantsCache.get(255L).containsAll(cuboidSet)); - - Assert.assertTrue(allDescendantsCache.get(239L).size() == 5); - - Assert.assertTrue(allDescendantsCache.get(50L).containsAll(Sets.newHashSet(50L, 2L))); - Assert.assertTrue(!allDescendantsCache.get(50L).contains(4L)); + Map<Long, List<Long>> directChildrenCache = CuboidStatsUtil.createDirectChildrenCache(cuboidSet); + + Assert.assertTrue(directChildrenCache.get(255L).containsAll(Lists.newArrayList(239L, 159L, 50L))); + Assert.assertTrue(directChildrenCache.get(159L).contains(6L)); + Assert.assertTrue(directChildrenCache.get(50L).contains(2L)); + Assert.assertTrue(directChildrenCache.get(239L).contains(199L)); + Assert.assertTrue(directChildrenCache.get(199L).contains(6L)); + Assert.assertTrue(directChildrenCache.get(6L).containsAll(Lists.newArrayList(4L, 2L))); } private Set<Long> generateMassCuboidSet() { @@ -144,18 +145,26 @@ public class CuboidStatsUtilTest { } @Test - public void createAllDescendantsCacheStressTest() { + public void createDirectChildrenCacheStressTest() { Stopwatch sw = new Stopwatch(); sw.start(); Set<Long> cuboidSet = generateMassCuboidSet(); - System.out.println("Time elapsed for creating sorted cuboid list: " + sw.elapsed(TimeUnit.MILLISECONDS)); - sw.reset(); - sw.start(); - CuboidStatsUtil.createAllDescendantsCache(cuboidSet); - System.out.println("Time elapsed for creating descendants cache: " + sw.elapsed(TimeUnit.MILLISECONDS)); + System.out.println("Time elapsed for creating sorted cuboid list: " + sw.elapsedMillis()); sw.reset(); sw.start(); - CuboidStatsUtil.createAllDescendantsCache2(cuboidSet); - System.out.println("Time elapsed for creating descendants cache2: " + sw.elapsed(TimeUnit.MILLISECONDS)); + checkDirectChildrenCacheStressTest(CuboidStatsUtil.createDirectChildrenCache(cuboidSet)); + System.out.println("Time elapsed for creating direct children cache: " + sw.elapsedMillis()); + sw.stop(); + } + + private void checkDirectChildrenCacheStressTest(Map<Long, List<Long>> directChildrenCache) { + for (Map.Entry<Long, List<Long>> entry : directChildrenCache.entrySet()) { + if (Long.bitCount(entry.getKey()) == 1) { + Assert.assertTrue("Check for cuboid " + entry.getKey(), entry.getValue().size() == 0); + } else { + Assert.assertTrue("Check for cuboid " + entry.getKey(), + Long.bitCount(entry.getKey()) == entry.getValue().size()); + } + } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/f0bc691e/core-cube/src/test/resources/learn_kylin_cube2.json ---------------------------------------------------------------------- diff --git a/core-cube/src/test/resources/learn_kylin_cube2.json b/core-cube/src/test/resources/learn_kylin_cube2.json new file mode 100644 index 0000000..4d731bb --- /dev/null +++ b/core-cube/src/test/resources/learn_kylin_cube2.json @@ -0,0 +1,84 @@ +{ + "uuid" : "bd66b324-4b0d-4fc7-a2ef-2b9ddced160d", + "last_modified" : 1503388415029, + "version" : "2.1.0", + "name" : "learn_kylin_cube2", + "owner" : "ADMIN", + "descriptor" : "learn_kylin_cube2", + "cost" : 50, + "status" : "READY", + "segments" : [ { + "uuid" : "4d017947-1a63-47a9-b31d-356219612cf6", + "name" : "20100101000000_20150101000000", + "storage_location_identifier" : "KYLIN_FKF6CWSIZC", + "date_range_start" : 1262304000000, + "date_range_end" : 1420070400000, + "source_offset_start" : 0, + "source_offset_end" : 0, + "status" : "READY", + "size_kb" : 22153, + "input_records" : 10000, + "input_records_size" : 984072, + "last_build_time" : 1503388048037, + "last_build_job_id" : "06c5bfd9-0443-4c28-810b-bcbd37fde458", + "create_time_utc" : 1503387574466, + "cuboid_shard_nums" : { }, + "total_shards" : 1, + "blackout_cuboids" : [ ], + "binary_signature" : null, + "dictionaries" : { + "KYLIN_SALES.LSTG_SITE_ID" : "/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/SITE_ID/d1dbea6a-4929-43ef-bf2a-cf356d9de9eb.dict", + "KYLIN_SALES.LEAF_CATEG_ID" : "/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/LEAF_CATEG_ID/60f6c00a-2137-4d6f-90e6-5433d30e326d.dict", + "KYLIN_CATEGORY_GROUPINGS.META_CATEG_NAME" : "/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/META_CATEG_NAME/e375c3b2-bebf-4f9a-95f3-abb76a7bdc80.dict", + "KYLIN_SALES.SLR_SEGMENT_CD" : "/dict/DEFAULT.KYLIN_SALES/SLR_SEGMENT_CD/ddb1659c-85cc-4d4f-8aae-40c6618f61cc.dict", + "KYLIN_CATEGORY_GROUPINGS.CATEG_LVL3_NAME" : "/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/CATEG_LVL3_NAME/7bbcdb05-4cf9-4d31-942c-828c5bbcaab8.dict", + "KYLIN_SALES.LSTG_FORMAT_NAME" : "/dict/DEFAULT.KYLIN_SALES/LSTG_FORMAT_NAME/35669f98-38f2-4e67-9547-1b0b446a1675.dict", + "KYLIN_CATEGORY_GROUPINGS.CATEG_LVL2_NAME" : "/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/CATEG_LVL2_NAME/a83a92e5-7711-4a34-bff8-ab3da625e996.dict" + }, + "snapshots" : { + "DEFAULT.KYLIN_CATEGORY_GROUPINGS" : "/table_snapshot/DEFAULT.KYLIN_CATEGORY_GROUPINGS/90b8251c-96c9-45a6-a4db-9fe49e8a0581.snapshot", + "DEFAULT.KYLIN_CAL_DT" : "/table_snapshot/DEFAULT.KYLIN_CAL_DT/35abb8cf-4e70-428e-adef-457e5a085aa0.snapshot" + }, + "rowkey_stats" : [ [ "KYLIN_SALES.LSTG_FORMAT_NAME", 5, 1 ], [ "KYLIN_SALES.SLR_SEGMENT_CD", 8, 1 ], [ "KYLIN_SALES.LEAF_CATEG_ID", 134, 1 ], [ "KYLIN_SALES.LSTG_SITE_ID", 8, 1 ], [ "KYLIN_CATEGORY_GROUPINGS.META_CATEG_NAME", 44, 1 ], [ "KYLIN_CATEGORY_GROUPINGS.CATEG_LVL2_NAME", 94, 1 ], [ "KYLIN_CATEGORY_GROUPINGS.CATEG_LVL3_NAME", 127, 1 ] ] + }, { + "uuid" : "4cb8c3e7-9624-44c4-b812-041cb5604e50", + "name" : "20100101000000_20150101000000", + "storage_location_identifier" : "KYLIN_VEXWSEB5Y7", + "date_range_start" : 1262304000000, + "date_range_end" : 1420070400000, + "source_offset_start" : 0, + "source_offset_end" : 0, + "status" : "NEW", + "size_kb" : 0, + "input_records" : 0, + "input_records_size" : 0, + "last_build_time" : 0, + "last_build_job_id" : null, + "create_time_utc" : 1503388305176, + "cuboid_shard_nums" : { }, + "total_shards" : 0, + "blackout_cuboids" : [ ], + "binary_signature" : null, + "dictionaries" : { + "KYLIN_SALES.LSTG_SITE_ID" : "/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/SITE_ID/d1dbea6a-4929-43ef-bf2a-cf356d9de9eb.dict", + "KYLIN_SALES.LEAF_CATEG_ID" : "/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/LEAF_CATEG_ID/60f6c00a-2137-4d6f-90e6-5433d30e326d.dict", + "KYLIN_CATEGORY_GROUPINGS.META_CATEG_NAME" : "/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/META_CATEG_NAME/e375c3b2-bebf-4f9a-95f3-abb76a7bdc80.dict", + "KYLIN_SALES.SLR_SEGMENT_CD" : "/dict/DEFAULT.KYLIN_SALES/SLR_SEGMENT_CD/ddb1659c-85cc-4d4f-8aae-40c6618f61cc.dict", + "KYLIN_CATEGORY_GROUPINGS.CATEG_LVL3_NAME" : "/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/CATEG_LVL3_NAME/7bbcdb05-4cf9-4d31-942c-828c5bbcaab8.dict", + "KYLIN_SALES.LSTG_FORMAT_NAME" : "/dict/DEFAULT.KYLIN_SALES/LSTG_FORMAT_NAME/35669f98-38f2-4e67-9547-1b0b446a1675.dict", + "KYLIN_CATEGORY_GROUPINGS.CATEG_LVL2_NAME" : "/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/CATEG_LVL2_NAME/a83a92e5-7711-4a34-bff8-ab3da625e996.dict" + }, + "snapshots" : { + "DEFAULT.KYLIN_CATEGORY_GROUPINGS" : "/table_snapshot/DEFAULT.KYLIN_CATEGORY_GROUPINGS/90b8251c-96c9-45a6-a4db-9fe49e8a0581.snapshot", + "DEFAULT.KYLIN_CAL_DT" : "/table_snapshot/DEFAULT.KYLIN_CAL_DT/35abb8cf-4e70-428e-adef-457e5a085aa0.snapshot" + }, + "rowkey_stats" : [ [ "KYLIN_SALES.LSTG_FORMAT_NAME", 5, 1 ], [ "KYLIN_SALES.SLR_SEGMENT_CD", 8, 1 ], [ "KYLIN_SALES.LEAF_CATEG_ID", 134, 1 ], [ "KYLIN_SALES.LSTG_SITE_ID", 8, 1 ], [ "KYLIN_CATEGORY_GROUPINGS.META_CATEG_NAME", 44, 1 ], [ "KYLIN_CATEGORY_GROUPINGS.CATEG_LVL2_NAME", 94, 1 ], [ "KYLIN_CATEGORY_GROUPINGS.CATEG_LVL3_NAME", 127, 1 ] ] + } ], + "create_time_utc" : 1502959568985, + "cuboid_bytes" : "eAEdkTmWBTEIA+/iuAM24+Vq8+buv+TINEhCqP9GzDnuOebf6Bp3fmOPu76RMe7m8XG96huL76BRNm4yPz1uGS/0PhSdIG3lN9wQcg09llB9VEo31UyoPg2RMNQqDiQoxdOgwkEJlFrowiCEMzemCDMu8LnRXeBiaisG3yuJ2dm05uG2nG8p454yigDlKsgyYjqOzjPkBizNmSUyW6tc15hWSTgo3MGnp+JAYqd6R8zc2IyX3So5aJC1zgMAPTYpQwEdE9adgDkEqYXZ7hCJhYktJ1t/4glhb9NCEsXzg0kldceG0YtAXb/p5NK4KPdTLsVmkf8/QfNdrA==", + "cuboid_bytes_recommend" : null, + "last_optimized" : 0, + "size_kb" : 22153, + "input_records_count" : 10000, + "input_records_size" : 984072 +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/f0bc691e/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java index 417697d..8f1da6e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java @@ -140,6 +140,10 @@ public class CubeStatsReader { return tempFile; } + public Map<Long, HLLCounter> getCuboidRowEstimatesHLLOrigin() { + return this.cuboidRowEstimatesHLL; + } + public Map<Long, Long> getCuboidRowEstimatesHLL() { return getCuboidRowCountMapFromSampling(cuboidRowEstimatesHLL, samplingPercentage); } http://git-wip-us.apache.org/repos/asf/kylin/blob/f0bc691e/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java new file mode 100644 index 0000000..ba3f023 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.common; + +import java.io.IOException; +import java.util.Map; + +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.algorithm.CuboidRecommender; +import org.apache.kylin.cube.cuboid.algorithm.CuboidStats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CuboidRecommenderUtil { + + private static final Logger logger = LoggerFactory.getLogger(CuboidRecommenderUtil.class); + + /** Trigger cube planner phase one */ + public static Map<Long, Long> getRecommendCuboidList(CubeSegment segment) throws IOException { + if (segment == null) { + return null; + } + + CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, segment.getConfig()); + if (cubeStatsReader.getCuboidRowEstimatesHLL() == null + || cubeStatsReader.getCuboidRowEstimatesHLL().isEmpty()) { + logger.info("Cuboid Statistics is not enabled."); + return null; + } + long baseCuboid = segment.getCuboidScheduler().getBaseCuboidId(); + if (cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == null + || cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == 0L) { + logger.info("Base cuboid count in cuboid statistics is 0."); + return null; + } + + String key = segment.getCubeInstance().getName(); + CuboidStats cuboidStats = new CuboidStats.Builder(key, baseCuboid, cubeStatsReader.getCuboidRowEstimatesHLL(), + cubeStatsReader.getCuboidSizeMap()).build(); + return CuboidRecommender.getInstance().getRecommendCuboidList(cuboidStats, segment.getConfig(), false); + } + + /** Trigger cube planner phase two for optimization */ + public static Map<Long, Long> getRecommendCuboidList(CubeInstance cube, Map<Long, Long> hitFrequencyMap, + Map<Long, Map<Long, Long>> rollingUpCountSourceMap) throws IOException { + + Pair<Map<Long, Long>, Map<Long, Double>> statsPair = CuboidStatsReaderUtil + .readCuboidStatsAndSizeFromCube(cube.getCuboidScheduler().getAllCuboidIds(), cube); + + String key = cube.getName(); + long baseCuboid = cube.getCuboidScheduler().getBaseCuboidId(); + CuboidStats cuboidStats = new CuboidStats.Builder(key, baseCuboid, statsPair.getFirst(), statsPair.getSecond()) + .setHitFrequencyMap(hitFrequencyMap).setRollingUpCountSourceMap(rollingUpCountSourceMap, + cube.getConfig().getCubePlannerMandatoryRollUpThreshold()) + .build(); + return CuboidRecommender.getInstance().getRecommendCuboidList(cuboidStats, cube.getConfig()); + } + + /** For future segment level recommend */ + public static Map<Long, Long> getRecommendCuboidList(CubeSegment segment, Map<Long, Long> hitFrequencyMap, + Map<Long, Map<Long, Long>> rollingUpCountSourceMap, boolean ifForceRecommend) throws IOException { + if (segment == null) { + return null; + } + + CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, segment.getConfig()); + if (cubeStatsReader.getCuboidRowEstimatesHLL() == null + || cubeStatsReader.getCuboidRowEstimatesHLL().isEmpty()) { + logger.info("Cuboid Statistics is not enabled."); + return null; + } + long baseCuboid = segment.getCuboidScheduler().getBaseCuboidId(); + if (cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == null + || cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == 0L) { + logger.info("Base cuboid count in cuboid statistics is 0."); + return null; + } + + String key = segment.getCubeInstance().getName() + "-" + segment.getName(); + CuboidStats cuboidStats = new CuboidStats.Builder(key, baseCuboid, cubeStatsReader.getCuboidRowEstimatesHLL(), + cubeStatsReader.getCuboidSizeMap()).setHitFrequencyMap(hitFrequencyMap) + .setRollingUpCountSourceMap(rollingUpCountSourceMap, + segment.getConfig().getCubePlannerMandatoryRollUpThreshold()) + .build(); + return CuboidRecommender.getInstance().getRecommendCuboidList(cuboidStats, segment.getConfig(), + ifForceRecommend); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/f0bc691e/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java new file mode 100644 index 0000000..3025750 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.common; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.measure.hllc.HLLCounter; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +public class CuboidStatsReaderUtil { + + private static final Logger logger = LoggerFactory.getLogger(CuboidStatsReaderUtil.class); + + public static Map<Long, Long> readCuboidStatsFromCube(Set<Long> cuboidIds, CubeInstance cubeInstance) + throws IOException { + Map<Long, Long> statisticsMerged = readCuboidStatsAndSizeFromCube(cuboidIds, cubeInstance).getFirst(); + return statisticsMerged.isEmpty() ? null : statisticsMerged; + } + + public static Pair<Map<Long, Long>, Map<Long, Double>> readCuboidStatsAndSizeFromCube(Set<Long> cuboidIds, + CubeInstance cube) throws IOException { + Preconditions.checkNotNull(cuboidIds, "The cuboid set can not be null"); + Preconditions.checkNotNull(cube, "The cube instance can not be null"); + + List<CubeSegment> segmentList = cube.getSegments(SegmentStatusEnum.READY); + Map<Long, Long> statisticsMerged = Maps.newHashMapWithExpectedSize(cuboidIds.size()); + Map<Long, Double> sizeMerged = Maps.newHashMapWithExpectedSize(cuboidIds.size()); + readCuboidStatsFromSegments(cuboidIds, segmentList, statisticsMerged, sizeMerged); + return new Pair<>(statisticsMerged, sizeMerged); + } + + public static Map<Long, Long> readCuboidStatsFromSegments(Set<Long> cuboidIds, List<CubeSegment> segmentList) + throws IOException { + Map<Long, Long> statisticsMerged = Maps.newHashMapWithExpectedSize(cuboidIds.size()); + readCuboidStatsFromSegments(cuboidIds, segmentList, statisticsMerged, + Maps.<Long, Double> newHashMapWithExpectedSize(cuboidIds.size())); + return statisticsMerged.isEmpty() ? null : statisticsMerged; + } + + private static void readCuboidStatsFromSegments(Set<Long> cuboidSet, List<CubeSegment> segmentList, + final Map<Long, Long> statisticsMerged, final Map<Long, Double> sizeMerged) throws IOException { + if (segmentList == null || segmentList.isEmpty()) { + return; + } + int nSegment = segmentList.size(); + + Map<Long, HLLCounter> cuboidHLLMapMerged = Maps.newHashMapWithExpectedSize(cuboidSet.size()); + Map<Long, Double> sizeMapMerged = Maps.newHashMapWithExpectedSize(cuboidSet.size()); + for (CubeSegment pSegment : segmentList) { + CubeStatsReader pReader = new CubeStatsReader(pSegment, pSegment.getConfig()); + Map<Long, HLLCounter> pHLLMap = pReader.getCuboidRowEstimatesHLLOrigin(); + if (pHLLMap == null || pHLLMap.isEmpty()) { + logger.info("Cuboid Statistics for segment " + pSegment.getName() + " is not enabled."); + nSegment--; + continue; + } + Map<Long, Double> pSizeMap = pReader.getCuboidSizeMap(); + for (Long pCuboid : cuboidSet) { + HLLCounter pInnerHLL = pHLLMap.get(pCuboid); + Preconditions.checkNotNull(pInnerHLL, "statistics should exist for cuboid " + pCuboid + " of segment " + + pSegment.getCubeDesc().getName() + "[" + pSegment.getName() + "]"); + if (cuboidHLLMapMerged.get(pCuboid) != null) { + cuboidHLLMapMerged.get(pCuboid).merge(pInnerHLL); + } else { + cuboidHLLMapMerged.put(pCuboid, pInnerHLL); + } + + Double pSize = sizeMapMerged.get(pCuboid); + sizeMapMerged.put(pCuboid, pSize == null ? pSizeMap.get(pCuboid) : pSizeMap.get(pCuboid) + pSize); + } + } + + if (nSegment < 1) { + return; + } + for (Long pCuboid : cuboidSet) { + statisticsMerged.put(pCuboid, cuboidHLLMapMerged.get(pCuboid).getCountEstimate()); + sizeMerged.put(pCuboid, sizeMapMerged.get(pCuboid)); + } + } + + public static Map<Long, Long> readCuboidStatsFromSegment(Set<Long> cuboidIds, CubeSegment cubeSegment) + throws IOException { + if (cubeSegment == null) { + logger.warn("The cube segment can not be " + null); + return null; + } + + CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, cubeSegment.getConfig()); + if (cubeStatsReader.getCuboidRowEstimatesHLL() == null + || cubeStatsReader.getCuboidRowEstimatesHLL().isEmpty()) { + logger.info("Cuboid Statistics is not enabled."); + return null; + } + + Map<Long, Long> cuboidsWithStatsAll = cubeStatsReader.getCuboidRowEstimatesHLL(); + Map<Long, Long> cuboidsWithStats = Maps.newHashMapWithExpectedSize(cuboidIds.size()); + for (Long cuboid : cuboidIds) { + Long rowEstimate = cuboidsWithStatsAll.get(cuboid); + if (rowEstimate == null) { + logger.warn("Cannot get the row count stats for cuboid " + cuboid); + } else { + cuboidsWithStats.put(cuboid, rowEstimate); + } + } + return cuboidsWithStats; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/f0bc691e/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java new file mode 100644 index 0000000..4145226 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.common; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StatisticsDecisionUtil { + protected static final Logger logger = LoggerFactory.getLogger(StatisticsDecisionUtil.class); + + public static void decideCubingAlgorithm(CubingJob cubingJob, CubeSegment seg) throws IOException { + CubeStatsReader cubeStats = new CubeStatsReader(seg, seg.getConfig()); + decideCubingAlgorithm(cubingJob, seg, cubeStats.getMapperOverlapRatioOfFirstBuild(), + cubeStats.getMapperNumberOfFirstBuild()); + } + + public static void decideCubingAlgorithm(CubingJob cubingJob, CubeSegment seg, double mapperOverlapRatio, + int mapperNumber) throws IOException { + KylinConfig kylinConf = seg.getConfig(); + String algPref = kylinConf.getCubeAlgorithm(); + CubingJob.AlgorithmEnum alg; + if (mapperOverlapRatio == 0) { // no source records + alg = CubingJob.AlgorithmEnum.INMEM; + } else if (CubingJob.AlgorithmEnum.INMEM.name().equalsIgnoreCase(algPref)) { + alg = CubingJob.AlgorithmEnum.INMEM; + } else if (CubingJob.AlgorithmEnum.LAYER.name().equalsIgnoreCase(algPref)) { + alg = CubingJob.AlgorithmEnum.LAYER; + } else { + int memoryHungryMeasures = 0; + for (MeasureDesc measure : seg.getCubeDesc().getMeasures()) { + if (measure.getFunction().getMeasureType().isMemoryHungry()) { + logger.info("This cube has memory-hungry measure " + measure.getFunction().getExpression()); + memoryHungryMeasures++; + } + } + + if (memoryHungryMeasures > 0) { + alg = CubingJob.AlgorithmEnum.LAYER; + } else if ("random".equalsIgnoreCase(algPref)) { // for testing + alg = new Random().nextBoolean() ? CubingJob.AlgorithmEnum.INMEM : CubingJob.AlgorithmEnum.LAYER; + } else { // the default + int mapperNumLimit = kylinConf.getCubeAlgorithmAutoMapperLimit(); + double overlapThreshold = kylinConf.getCubeAlgorithmAutoThreshold(); + logger.info("mapperNumber for " + seg + " is " + mapperNumber + " and threshold is " + mapperNumLimit); + logger.info("mapperOverlapRatio for " + seg + " is " + mapperOverlapRatio + " and threshold is " + + overlapThreshold); + + // in-mem cubing is good when + // 1) the cluster has enough mapper slots to run in parallel + // 2) the mapper overlap ratio is small, meaning the shuffle of in-mem MR has advantage + alg = (mapperNumber <= mapperNumLimit && mapperOverlapRatio <= overlapThreshold)// + ? CubingJob.AlgorithmEnum.INMEM + : CubingJob.AlgorithmEnum.LAYER; + } + + } + logger.info("The cube algorithm for " + seg + " is " + alg); + + cubingJob.setAlgorithm(alg); + } + + public static void optimizeCubingPlan(CubeSegment segment) throws IOException { + CubeInstance cube = segment.getCubeInstance(); + List<CubeSegment> readySegments = cube.getSegments(SegmentStatusEnum.READY); + if (readySegments.size() == 0 || (cube.getConfig().isCubePlannerEnabledForExistingCube() + && readySegments.size() == 1 && (readySegments.get(0).getSegRange().equals(segment.getSegRange())))) { + logger.info("It's able to trigger cuboid planner algorithm."); + } else { + return; + } + + Map<Long, Long> recommendCuboidsWithStats = CuboidRecommenderUtil.getRecommendCuboidList(segment); + if (recommendCuboidsWithStats == null || recommendCuboidsWithStats.isEmpty()) { + return; + } + + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setCuboids(recommendCuboidsWithStats); + CubeManager.getInstance(cube.getConfig()).updateCube(cubeBuilder); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/f0bc691e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java index 28f99fb..7085cf0 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java @@ -19,7 +19,6 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; -import java.util.Random; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -30,14 +29,12 @@ import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.CubingJob; -import org.apache.kylin.engine.mr.CubingJob.AlgorithmEnum; import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.engine.mr.common.CubeStatsReader; +import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; -import org.apache.kylin.metadata.model.MeasureDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,12 +68,15 @@ public class SaveStatisticsStep extends AbstractExecutable { // put the statistics to metadata store String statisticsFileName = newSegment.getStatisticsResourcePath(); rs.putResource(statisticsFileName, is, System.currentTimeMillis()); + + CubingJob cubingJob = (CubingJob) getManager() + .getJob(CubingExecutableUtil.getCubingJobId(this.getParams())); + StatisticsDecisionUtil.decideCubingAlgorithm(cubingJob, newSegment); + StatisticsDecisionUtil.optimizeCubingPlan(newSegment); } finally { IOUtils.closeStream(is); } - decideCubingAlgorithm(newSegment, kylinConf); - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); } catch (IOException e) { logger.error("fail to save cuboid statistics", e); @@ -84,47 +84,4 @@ public class SaveStatisticsStep extends AbstractExecutable { } } - private void decideCubingAlgorithm(CubeSegment seg, KylinConfig kylinConf) throws IOException { - String algPref = kylinConf.getCubeAlgorithm(); - AlgorithmEnum alg; - if (AlgorithmEnum.INMEM.name().equalsIgnoreCase(algPref)) { - alg = AlgorithmEnum.INMEM; - } else if (AlgorithmEnum.LAYER.name().equalsIgnoreCase(algPref)) { - alg = AlgorithmEnum.LAYER; - } else { - int memoryHungryMeasures = 0; - for (MeasureDesc measure : seg.getCubeDesc().getMeasures()) { - if (measure.getFunction().getMeasureType().isMemoryHungry()) { - logger.info("This cube has memory-hungry measure " + measure.getFunction().getExpression()); - memoryHungryMeasures++; - } - } - - if (memoryHungryMeasures > 0) { - alg = AlgorithmEnum.LAYER; - } else if ("random".equalsIgnoreCase(algPref)) { // for testing - alg = new Random().nextBoolean() ? AlgorithmEnum.INMEM : AlgorithmEnum.LAYER; - } else { // the default - CubeStatsReader cubeStats = new CubeStatsReader(seg, kylinConf); - int mapperNumber = cubeStats.getMapperNumberOfFirstBuild(); - int mapperNumLimit = kylinConf.getCubeAlgorithmAutoMapperLimit(); - double mapperOverlapRatio = cubeStats.getMapperOverlapRatioOfFirstBuild(); - double overlapThreshold = kylinConf.getCubeAlgorithmAutoThreshold(); - logger.info("mapperNumber for " + seg + " is " + mapperNumber + " and threshold is " + mapperNumLimit); - logger.info("mapperOverlapRatio for " + seg + " is " + mapperOverlapRatio + " and threshold is " + overlapThreshold); - - // in-mem cubing is good when - // 1) the cluster has enough mapper slots to run in parallel - // 2) the mapper overlap ratio is small, meaning the shuffle of in-mem MR has advantage - alg = (mapperNumber <= mapperNumLimit && mapperOverlapRatio <= overlapThreshold)// - ? AlgorithmEnum.INMEM : AlgorithmEnum.LAYER; - } - - } - logger.info("The cube algorithm for " + seg + " is " + alg); - - CubingJob cubingJob = (CubingJob) getManager().getJob(CubingExecutableUtil.getCubingJobId(this.getParams())); - cubingJob.setAlgorithm(alg); - } - }