APACHE-KYLIN-2802: Enable cube planner phase one
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e75770f6 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e75770f6 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e75770f6 Branch: refs/heads/yaho-cube-planner Commit: e75770f656f04612ca4e131b226731e6663ccb07 Parents: 697f848 Author: Zhong <nju_y...@apache.org> Authored: Tue Aug 22 17:58:37 2017 +0800 Committer: Zhong <nju_y...@apache.org> Committed: Tue Aug 22 17:58:37 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 27 +++ .../org/apache/kylin/common/util/JsonUtil.java | 4 + .../org/apache/kylin/cube/CubeInstance.java | 19 ++- .../java/org/apache/kylin/cube/CubeManager.java | 4 + .../java/org/apache/kylin/cube/CubeUpdate.java | 11 ++ .../cuboid/algorithm/CuboidRecommender.java | 163 +++++++++++++++++++ .../org/apache/kylin/cube/CubeInstanceTest.java | 57 +++++++ .../src/test/resources/learn_kylin_cube2.json | 84 ++++++++++ .../kylin/engine/mr/common/CubeStatsReader.java | 4 + .../engine/mr/common/CuboidRecommenderUtil.java | 106 ++++++++++++ .../kylin/engine/mr/common/CuboidStatsUtil.java | 135 +++++++++++++++ .../mr/common/StatisticsDecisionUtil.java | 110 +++++++++++++ .../engine/mr/steps/SaveStatisticsStep.java | 55 +------ 13 files changed, 722 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/e75770f6/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 6feb51a..d66d7ce 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1126,6 +1126,33 @@ abstract public class KylinConfigBase implements Serializable { } // ============================================================================ + // Cube Planner + // ============================================================================ + public boolean isCubePlannerEnabledForExistingCube() { + return Boolean.parseBoolean(getOptional("kylin.cube.cubeplanner.enabled-for-existing-cube", "false")); + } + + public double getCubePlannerExpansionRateThreshold() { + return Double.parseDouble(getOptional("kylin.cube.cubeplanner.expansion-threshold", "15.0")); + } + + public int getCubePlannerRecommendCuboidCacheMaxSize() { + return Integer.parseInt(getOptional("kylin.cube.cubeplanner.recommend-cache-max-size", "200")); + } + + public long getCubePlannerMandatoryRollUpThreshold() { + return Long.parseLong(getOptional("kylin.cube.cubeplanner.mandatory-rollup-threshold", "1000")); + } + + public int getCubePlannerAgreedyAlgorithmAutoThreshold() { + return Integer.parseInt(getOptional("kylin.cube.cubeplanner.algorithm-threshold-greedy", "10")); + } + + public int getCubePlannerGeneticAlgorithmAutoThreshold() { + return Integer.parseInt(getOptional("kylin.cube.cubeplanner.algorithm-threshold-genetic", "23")); + } + + // ============================================================================ // Metrics // ============================================================================ public boolean isKylinMetricsMonitorEnabled() { http://git-wip-us.apache.org/repos/asf/kylin/blob/e75770f6/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java index f874b16..c30a1f2 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java @@ -92,6 +92,10 @@ public class JsonUtil { return typeMapper.readValue(src, valueType); } + public static <T> T readValue(String content, TypeReference<T> valueTypeRef) throws IOException { + return mapper.readValue(content, valueTypeRef); + } + public static void writeValueIndent(OutputStream out, Object value) throws IOException, JsonGenerationException, JsonMappingException { indentMapper.writeValue(out, value); http://git-wip-us.apache.org/repos/asf/kylin/blob/e75770f6/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java index 3f8bbbe..d217dcd 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java @@ -22,8 +22,6 @@ import static org.apache.kylin.cube.cuboid.CuboidModeEnum.CURRENT; import static org.apache.kylin.cube.cuboid.CuboidModeEnum.RECOMMEND; import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -59,6 +57,7 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonManagedReference; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Objects; import com.google.common.collect.Lists; @@ -359,21 +358,23 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, } } - public HashMap<Long, Long> getCuboids() { + public Map<Long, Long> getCuboids() { if (cuboidBytes == null) return null; byte[] uncompressed; try { uncompressed = CompressionUtils.decompress(cuboidBytes); String str = new String(uncompressed, "UTF-8"); - HashMap<Long, Long> cuboids = JsonUtil.readValue(str, HashMap.class); + TypeReference<Map<Long, Long>> typeRef = new TypeReference<Map<Long, Long>>() { + }; + Map<Long, Long> cuboids = JsonUtil.readValue(str, typeRef); return cuboids.isEmpty() ? null : cuboids; } catch (Exception e) { throw new RuntimeException(e); } } - public void setCuboids(HashMap<Long, Long> cuboids) { + public void setCuboids(Map<Long, Long> cuboids) { if (cuboids == null) return; if (cuboids.isEmpty()) { @@ -390,21 +391,23 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, } } - public HashSet<Long> getCuboidsRecommend() { + public Set<Long> getCuboidsRecommend() { if (cuboidBytesRecommend == null) return null; byte[] uncompressed; try { uncompressed = CompressionUtils.decompress(cuboidBytesRecommend); String str = new String(uncompressed, "UTF-8"); - HashSet<Long> cuboids = JsonUtil.readValue(str, HashSet.class); + TypeReference<Set<Long>> typeRef = new TypeReference<Set<Long>>() { + }; + Set<Long> cuboids = JsonUtil.readValue(str, typeRef); return cuboids.isEmpty() ? null : cuboids; } catch (Exception e) { throw new RuntimeException(e); } } - public void setCuboidsRecommend(HashSet<Long> cuboids) { + public void setCuboidsRecommend(Set<Long> cuboids) { if (cuboids == null) return; if (cuboids.isEmpty()) { http://git-wip-us.apache.org/repos/asf/kylin/blob/e75770f6/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 739400a..ec43fe3 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -421,6 +421,10 @@ public class CubeManager implements IRealizationProvider { cube.setCost(update.getCost()); } + if(update.getCuboids() != null){ + cube.setCuboids(update.getCuboids()); + } + try { getStore().putResource(cube.getResourcePath(), cube, CUBE_SERIALIZER); } catch (IllegalStateException ise) { http://git-wip-us.apache.org/repos/asf/kylin/blob/e75770f6/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java index dffaa48..fae20dc 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java @@ -18,6 +18,8 @@ package org.apache.kylin.cube; +import java.util.Map; + import org.apache.kylin.metadata.realization.RealizationStatusEnum; /** @@ -31,6 +33,7 @@ public class CubeUpdate { private RealizationStatusEnum status; private String owner; private int cost = -1; + private Map<Long, Long> cuboids = null; public CubeUpdate(CubeInstance cubeInstance) { this.cubeInstance = cubeInstance; @@ -98,4 +101,12 @@ public class CubeUpdate { this.cost = cost; return this; } + + public Map<Long, Long> getCuboids() { + return cuboids; + } + + public void setCuboids(Map<Long, Long> cuboids) { + this.cuboids = cuboids; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/e75770f6/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java new file mode 100644 index 0000000..43b2318 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.cube.cuboid.algorithm; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.cuboid.algorithm.generic.GeneticAlgorithm; +import org.apache.kylin.cube.cuboid.algorithm.greedy.GreedyAlgorithm; +import org.apache.kylin.metadata.cachesync.Broadcaster; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.common.collect.Maps; + +public class CuboidRecommender { + private static final Logger logger = LoggerFactory.getLogger(CuboidRecommender.class); + + private static Cache<String, Map<Long, Long>> cuboidRecommendCache = CacheBuilder.newBuilder() + .removalListener(new RemovalListener<String, Map<Long, Long>>() { + @Override + public void onRemoval(RemovalNotification<String, Map<Long, Long>> notification) { + logger.info("Dict with resource path " + notification.getKey() + " is removed due to " + + notification.getCause()); + } + }).maximumSize(KylinConfig.getInstanceFromEnv().getCubePlannerRecommendCuboidCacheMaxSize()) + .expireAfterWrite(1, TimeUnit.DAYS).build(); + + private class CuboidRecommenderSyncListener extends Broadcaster.Listener { + @Override + public void onClearAll(Broadcaster broadcaster) throws IOException { + cuboidRecommendCache.invalidateAll(); + } + + @Override + public void onEntityChange(Broadcaster broadcaster, String entity, Broadcaster.Event event, String cacheKey) + throws IOException { + cuboidRecommendCache.invalidate(cacheKey); + } + } + + public CuboidRecommender() { + Broadcaster.getInstance(KylinConfig.getInstanceFromEnv()).registerListener(new CuboidRecommenderSyncListener(), + "cube"); + } + + private static CuboidRecommender instance = new CuboidRecommender(); + + public static CuboidRecommender getInstance() { + return instance; + } + + public Map<Long, Long> getRecommendCuboidList(final CuboidStats cuboidStats, final KylinConfig kylinConfig) { + if (cuboidStats == null) { + return null; + } + final String key = cuboidStats.getKey(); + Map<Long, Long> results = cuboidRecommendCache.getIfPresent(key); + if (results == null) { + try { + results = cuboidRecommendCache.get(key, new Callable<Map<Long, Long>>() { + @Override + public Map<Long, Long> call() throws Exception { + // recommending flag + Map<Long, Long> emptyMap = Maps.newHashMap(); + cuboidRecommendCache.put(key, emptyMap); + try { + Map<Long, Long> recommendCuboid = getRecommendCuboidList(cuboidStats, kylinConfig, + true); + + if (recommendCuboid != null) { + logger.info("Add recommend cuboids for " + key + " to cache"); + cuboidRecommendCache.put(key, recommendCuboid); + } + + return recommendCuboid; + } catch (Exception e) { + cuboidRecommendCache.invalidate(key); + logger.error("Failed to get recommend cuboids for " + key + " in cache", e); + throw e; + } + } + }); + } catch (ExecutionException e) { + logger.error("Failed to get recommend cuboids for " + key); + } + } + return results; + } + + public Map<Long, Long> getRecommendCuboidList(CuboidStats cuboidStats, KylinConfig kylinConf, + boolean ifForceRecommend) { + long Threshold1 = 1L << kylinConf.getCubePlannerAgreedyAlgorithmAutoThreshold(); + long Threshold2 = 1L << kylinConf.getCubePlannerGeneticAlgorithmAutoThreshold(); + if (Threshold1 >= Threshold2) { + logger.error("Invalid Cube Planner Algorithm configuration"); + return null; + } + + int allCuboidCount = cuboidStats.getAllCuboidsForMandatory().size() + + cuboidStats.getAllCuboidsForSelection().size(); + + BenefitPolicy benefitPolicy = new PBPUSCalculator(cuboidStats); + CuboidRecommendAlgorithm algorithm = null; + + if (allCuboidCount <= Threshold2) { + algorithm = new GreedyAlgorithm(-1, benefitPolicy, cuboidStats); + } else { + algorithm = new GeneticAlgorithm(-1, benefitPolicy, cuboidStats); + } + + long startTime = System.currentTimeMillis(); + logger.info("Cube Planner Algorithm started at " + startTime); + List<Long> recommendCuboidList = algorithm.recommend(kylinConf.getCubePlannerExpansionRateThreshold()); + logger.info("Cube Planner Algorithm ended at " + (System.currentTimeMillis() - startTime)); + + if (recommendCuboidList.size() < allCuboidCount) { + logger.info("Cube Planner Algorithm chooses " + recommendCuboidList.size() + + " most effective cuboids to build among of all " + allCuboidCount + " cuboids."); + } + + Map<Long, Long> recommendCuboidsWithStats = Maps.newLinkedHashMap(); + for (Long cuboid : recommendCuboidList) { + if (cuboid.equals(cuboidStats.getBaseCuboid())) { + recommendCuboidsWithStats.put(cuboid, cuboidStats.getCuboidCount(cuboid)); + } else if (cuboidStats.getAllCuboidsForSelection().contains(cuboid)) { + recommendCuboidsWithStats.put(cuboid, cuboidStats.getCuboidCount(cuboid)); + } else { + recommendCuboidsWithStats.put(cuboid, -1L); + } + } + + if (!ifForceRecommend && allCuboidCount <= Threshold1) { + return null; + } + return recommendCuboidsWithStats; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e75770f6/core-cube/src/test/java/org/apache/kylin/cube/CubeInstanceTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeInstanceTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeInstanceTest.java new file mode 100644 index 0000000..0af5f98 --- /dev/null +++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeInstanceTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.cube; + +import java.io.DataInputStream; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Map; + +import org.apache.kylin.common.persistence.JsonSerializer; +import org.apache.kylin.cube.cuboid.TreeCuboidScheduler; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class CubeInstanceTest { + + private CubeInstance cubeInstance; + + @Before + public void setUp() throws Exception { + InputStream fileInputStream = new FileInputStream("src/test/resources/learn_kylin_cube2.json"); + JsonSerializer<CubeInstance> jsonSerializer = new JsonSerializer<>(CubeInstance.class); + cubeInstance = jsonSerializer.deserialize(new DataInputStream(fileInputStream)); + } + + @Test + public void getTreeCuboidSchedulerTest() { + Map<Long, Long> cuboidsWithRowCnt = cubeInstance.getCuboids(); + TreeCuboidScheduler.CuboidTree.createFromCuboids(Lists.newArrayList(cuboidsWithRowCnt.keySet()), + new TreeCuboidScheduler.CuboidCostComparator(cuboidsWithRowCnt)); + } + + public void printMap(Map<Long, Long> map) { + System.out.println("size: " + map.size()); + for (Map.Entry<Long, Long> entry : map.entrySet()) { + System.out.println(entry.getKey() + ":" + entry.getValue()); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e75770f6/core-cube/src/test/resources/learn_kylin_cube2.json ---------------------------------------------------------------------- diff --git a/core-cube/src/test/resources/learn_kylin_cube2.json b/core-cube/src/test/resources/learn_kylin_cube2.json new file mode 100644 index 0000000..1f50902 --- /dev/null +++ b/core-cube/src/test/resources/learn_kylin_cube2.json @@ -0,0 +1,84 @@ +{ + "uuid" : "bd66b324-4b0d-4fc7-a2ef-2b9ddced160d", + "last_modified" : 1503388415029, + "version" : "2.1.0", + "name" : "learn_kylin_cube2", + "owner" : "ADMIN", + "descriptor" : "learn_kylin_cube2", + "cost" : 50, + "status" : "READY", + "segments" : [ { + "uuid" : "4d017947-1a63-47a9-b31d-356219612cf6", + "name" : "20100101000000_20150101000000", + "storage_location_identifier" : "KYLIN_FKF6CWSIZC", + "date_range_start" : 1262304000000, + "date_range_end" : 1420070400000, + "source_offset_start" : 0, + "source_offset_end" : 0, + "status" : "READY", + "size_kb" : 22153, + "input_records" : 10000, + "input_records_size" : 984072, + "last_build_time" : 1503388048037, + "last_build_job_id" : "06c5bfd9-0443-4c28-810b-bcbd37fde458", + "create_time_utc" : 1503387574466, + "cuboid_shard_nums" : { }, + "total_shards" : 1, + "blackout_cuboids" : [ ], + "binary_signature" : null, + "dictionaries" : { + "KYLIN_SALES.LSTG_SITE_ID" : "/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/SITE_ID/d1dbea6a-4929-43ef-bf2a-cf356d9de9eb.dict", + "KYLIN_SALES.LEAF_CATEG_ID" : "/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/LEAF_CATEG_ID/60f6c00a-2137-4d6f-90e6-5433d30e326d.dict", + "KYLIN_CATEGORY_GROUPINGS.META_CATEG_NAME" : "/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/META_CATEG_NAME/e375c3b2-bebf-4f9a-95f3-abb76a7bdc80.dict", + "KYLIN_SALES.SLR_SEGMENT_CD" : "/dict/DEFAULT.KYLIN_SALES/SLR_SEGMENT_CD/ddb1659c-85cc-4d4f-8aae-40c6618f61cc.dict", + "KYLIN_CATEGORY_GROUPINGS.CATEG_LVL3_NAME" : "/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/CATEG_LVL3_NAME/7bbcdb05-4cf9-4d31-942c-828c5bbcaab8.dict", + "KYLIN_SALES.LSTG_FORMAT_NAME" : "/dict/DEFAULT.KYLIN_SALES/LSTG_FORMAT_NAME/35669f98-38f2-4e67-9547-1b0b446a1675.dict", + "KYLIN_CATEGORY_GROUPINGS.CATEG_LVL2_NAME" : "/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/CATEG_LVL2_NAME/a83a92e5-7711-4a34-bff8-ab3da625e996.dict" + }, + "snapshots" : { + "DEFAULT.KYLIN_CATEGORY_GROUPINGS" : "/table_snapshot/DEFAULT.KYLIN_CATEGORY_GROUPINGS/90b8251c-96c9-45a6-a4db-9fe49e8a0581.snapshot", + "DEFAULT.KYLIN_CAL_DT" : "/table_snapshot/DEFAULT.KYLIN_CAL_DT/35abb8cf-4e70-428e-adef-457e5a085aa0.snapshot" + }, + "rowkey_stats" : [ [ "KYLIN_SALES.LSTG_FORMAT_NAME", 5, 1 ], [ "KYLIN_SALES.SLR_SEGMENT_CD", 8, 1 ], [ "KYLIN_SALES.LEAF_CATEG_ID", 134, 1 ], [ "KYLIN_SALES.LSTG_SITE_ID", 8, 1 ], [ "KYLIN_CATEGORY_GROUPINGS.META_CATEG_NAME", 44, 1 ], [ "KYLIN_CATEGORY_GROUPINGS.CATEG_LVL2_NAME", 94, 1 ], [ "KYLIN_CATEGORY_GROUPINGS.CATEG_LVL3_NAME", 127, 1 ] ] + }, { + "uuid" : "4cb8c3e7-9624-44c4-b812-041cb5604e50", + "name" : "20100101000000_20150101000000", + "storage_location_identifier" : "KYLIN_VEXWSEB5Y7", + "date_range_start" : 1262304000000, + "date_range_end" : 1420070400000, + "source_offset_start" : 0, + "source_offset_end" : 0, + "status" : "NEW", + "size_kb" : 0, + "input_records" : 0, + "input_records_size" : 0, + "last_build_time" : 0, + "last_build_job_id" : null, + "create_time_utc" : 1503388305176, + "cuboid_shard_nums" : { }, + "total_shards" : 0, + "blackout_cuboids" : [ ], + "binary_signature" : null, + "dictionaries" : { + "KYLIN_SALES.LSTG_SITE_ID" : "/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/SITE_ID/d1dbea6a-4929-43ef-bf2a-cf356d9de9eb.dict", + "KYLIN_SALES.LEAF_CATEG_ID" : "/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/LEAF_CATEG_ID/60f6c00a-2137-4d6f-90e6-5433d30e326d.dict", + "KYLIN_CATEGORY_GROUPINGS.META_CATEG_NAME" : "/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/META_CATEG_NAME/e375c3b2-bebf-4f9a-95f3-abb76a7bdc80.dict", + "KYLIN_SALES.SLR_SEGMENT_CD" : "/dict/DEFAULT.KYLIN_SALES/SLR_SEGMENT_CD/ddb1659c-85cc-4d4f-8aae-40c6618f61cc.dict", + "KYLIN_CATEGORY_GROUPINGS.CATEG_LVL3_NAME" : "/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/CATEG_LVL3_NAME/7bbcdb05-4cf9-4d31-942c-828c5bbcaab8.dict", + "KYLIN_SALES.LSTG_FORMAT_NAME" : "/dict/DEFAULT.KYLIN_SALES/LSTG_FORMAT_NAME/35669f98-38f2-4e67-9547-1b0b446a1675.dict", + "KYLIN_CATEGORY_GROUPINGS.CATEG_LVL2_NAME" : "/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/CATEG_LVL2_NAME/a83a92e5-7711-4a34-bff8-ab3da625e996.dict" + }, + "snapshots" : { + "DEFAULT.KYLIN_CATEGORY_GROUPINGS" : "/table_snapshot/DEFAULT.KYLIN_CATEGORY_GROUPINGS/90b8251c-96c9-45a6-a4db-9fe49e8a0581.snapshot", + "DEFAULT.KYLIN_CAL_DT" : "/table_snapshot/DEFAULT.KYLIN_CAL_DT/35abb8cf-4e70-428e-adef-457e5a085aa0.snapshot" + }, + "rowkey_stats" : [ [ "KYLIN_SALES.LSTG_FORMAT_NAME", 5, 1 ], [ "KYLIN_SALES.SLR_SEGMENT_CD", 8, 1 ], [ "KYLIN_SALES.LEAF_CATEG_ID", 134, 1 ], [ "KYLIN_SALES.LSTG_SITE_ID", 8, 1 ], [ "KYLIN_CATEGORY_GROUPINGS.META_CATEG_NAME", 44, 1 ], [ "KYLIN_CATEGORY_GROUPINGS.CATEG_LVL2_NAME", 94, 1 ], [ "KYLIN_CATEGORY_GROUPINGS.CATEG_LVL3_NAME", 127, 1 ] ] + } ], + "create_time_utc" : 1502959568985, + "cuboidBytes" : "eAEdkTmWBTEIA+/iuAM24+Vq8+buv+TINEhCqP9GzDnuOebf6Bp3fmOPu76RMe7m8XG96huL76BRNm4yPz1uGS/0PhSdIG3lN9wQcg09llB9VEo31UyoPg2RMNQqDiQoxdOgwkEJlFrowiCEMzemCDMu8LnRXeBiaisG3yuJ2dm05uG2nG8p454yigDlKsgyYjqOzjPkBizNmSUyW6tc15hWSTgo3MGnp+JAYqd6R8zc2IyX3So5aJC1zgMAPTYpQwEdE9adgDkEqYXZ7hCJhYktJ1t/4glhb9NCEsXzg0kldceG0YtAXb/p5NK4KPdTLsVmkf8/QfNdrA==", + "cuboid_bytes_recommend" : null, + "last_optimized" : 0, + "size_kb" : 22153, + "input_records_count" : 10000, + "input_records_size" : 984072 +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/e75770f6/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java index 417697d..8f1da6e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java @@ -140,6 +140,10 @@ public class CubeStatsReader { return tempFile; } + public Map<Long, HLLCounter> getCuboidRowEstimatesHLLOrigin() { + return this.cuboidRowEstimatesHLL; + } + public Map<Long, Long> getCuboidRowEstimatesHLL() { return getCuboidRowCountMapFromSampling(cuboidRowEstimatesHLL, samplingPercentage); } http://git-wip-us.apache.org/repos/asf/kylin/blob/e75770f6/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java new file mode 100644 index 0000000..a0b20fa --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.common; + +import java.io.IOException; +import java.util.Map; + +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.algorithm.CuboidRecommender; +import org.apache.kylin.cube.cuboid.algorithm.CuboidStats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CuboidRecommenderUtil { + + private static final Logger logger = LoggerFactory.getLogger(CuboidRecommenderUtil.class); + + /** Trigger cube planner phase one */ + public static Map<Long, Long> getRecommendCuboidList(CubeSegment segment) throws IOException { + if (segment == null) { + return null; + } + + CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, segment.getConfig()); + if (cubeStatsReader.getCuboidRowEstimatesHLL() == null + || cubeStatsReader.getCuboidRowEstimatesHLL().isEmpty()) { + logger.info("Cuboid Statistics is not enabled."); + return null; + } + long baseCuboid = segment.getCuboidScheduler().getBaseCuboidId(); + if (cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == null + || cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == 0L) { + logger.info("Base cuboid count in cuboid statistics is 0."); + return null; + } + + String key = segment.getCubeInstance().getName(); + CuboidStats cuboidStats = new CuboidStats.Builder(key, baseCuboid, cubeStatsReader.getCuboidRowEstimatesHLL(), + cubeStatsReader.getCuboidSizeMap()).build(); + return CuboidRecommender.getInstance().getRecommendCuboidList(cuboidStats, segment.getConfig(), false); + } + + /** Trigger cube planner phase two for optimization */ + public static Map<Long, Long> getRecommendCuboidList(CubeInstance cube, Map<Long, Long> hitFrequencyMap, + Map<Long, Map<Long, Long>> rollingUpCountSourceMap) throws IOException { + + Pair<Map<Long, Long>, Map<Long, Double>> statsPair = CuboidStatsUtil + .readCuboidStatsAndSizeFromCube(cube.getCuboidScheduler().getAllCuboidIds(), cube); + + String key = cube.getName(); + long baseCuboid = cube.getCuboidScheduler().getBaseCuboidId(); + CuboidStats cuboidStats = new CuboidStats.Builder(key, baseCuboid, statsPair.getFirst(), statsPair.getSecond()) + .setHitFrequencyMap(hitFrequencyMap).setRollingUpCountSourceMap(rollingUpCountSourceMap, + cube.getConfig().getCubePlannerMandatoryRollUpThreshold()) + .build(); + return CuboidRecommender.getInstance().getRecommendCuboidList(cuboidStats, cube.getConfig()); + } + + /** For future segment level recommend */ + public static Map<Long, Long> getRecommendCuboidList(CubeSegment segment, Map<Long, Long> hitFrequencyMap, + Map<Long, Map<Long, Long>> rollingUpCountSourceMap, boolean ifForceRecommend) throws IOException { + if (segment == null) { + return null; + } + + CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, segment.getConfig()); + if (cubeStatsReader.getCuboidRowEstimatesHLL() == null + || cubeStatsReader.getCuboidRowEstimatesHLL().isEmpty()) { + logger.info("Cuboid Statistics is not enabled."); + return null; + } + long baseCuboid = segment.getCuboidScheduler().getBaseCuboidId(); + if (cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == null + || cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == 0L) { + logger.info("Base cuboid count in cuboid statistics is 0."); + return null; + } + + String key = segment.getCubeInstance().getName() + "-" + segment.getName(); + CuboidStats cuboidStats = new CuboidStats.Builder(key, baseCuboid, cubeStatsReader.getCuboidRowEstimatesHLL(), + cubeStatsReader.getCuboidSizeMap()).setHitFrequencyMap(hitFrequencyMap) + .setRollingUpCountSourceMap(rollingUpCountSourceMap, + segment.getConfig().getCubePlannerMandatoryRollUpThreshold()) + .build(); + return CuboidRecommender.getInstance().getRecommendCuboidList(cuboidStats, segment.getConfig(), + ifForceRecommend); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e75770f6/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java new file mode 100644 index 0000000..5bb4179 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.common; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.measure.hllc.HLLCounter; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +public class CuboidStatsUtil { + + private static final Logger logger = LoggerFactory.getLogger(CuboidStatsUtil.class); + + public static Map<Long, Long> readCuboidStatsFromCube(Set<Long> cuboidIds, CubeInstance cubeInstance) + throws IOException { + Map<Long, Long> statisticsMerged = readCuboidStatsAndSizeFromCube(cuboidIds, cubeInstance).getFirst(); + return statisticsMerged.isEmpty() ? null : statisticsMerged; + } + + public static Pair<Map<Long, Long>, Map<Long, Double>> readCuboidStatsAndSizeFromCube(Set<Long> cuboidIds, + CubeInstance cube) throws IOException { + Preconditions.checkNotNull(cuboidIds, "The cuboid set can not be null"); + Preconditions.checkNotNull(cube, "The cube instance can not be null"); + + List<CubeSegment> segmentList = cube.getSegments(SegmentStatusEnum.READY); + Map<Long, Long> statisticsMerged = Maps.newHashMapWithExpectedSize(cuboidIds.size()); + Map<Long, Double> sizeMerged = Maps.newHashMapWithExpectedSize(cuboidIds.size()); + readCuboidStatsFromSegments(cuboidIds, segmentList, statisticsMerged, sizeMerged); + return new Pair<>(statisticsMerged, sizeMerged); + } + + public static Map<Long, Long> readCuboidStatsFromSegments(Set<Long> cuboidIds, List<CubeSegment> segmentList) + throws IOException { + Map<Long, Long> statisticsMerged = Maps.newHashMapWithExpectedSize(cuboidIds.size()); + readCuboidStatsFromSegments(cuboidIds, segmentList, statisticsMerged, + Maps.<Long, Double> newHashMapWithExpectedSize(cuboidIds.size())); + return statisticsMerged.isEmpty() ? null : statisticsMerged; + } + + private static void readCuboidStatsFromSegments(Set<Long> cuboidSet, List<CubeSegment> segmentList, + final Map<Long, Long> statisticsMerged, final Map<Long, Double> sizeMerged) throws IOException { + if (segmentList == null || segmentList.isEmpty()) { + return; + } + int nSegment = segmentList.size(); + + Map<Long, HLLCounter> cuboidHLLMapMerged = Maps.newHashMapWithExpectedSize(cuboidSet.size()); + Map<Long, Double> sizeMapMerged = Maps.newHashMapWithExpectedSize(cuboidSet.size()); + for (CubeSegment pSegment : segmentList) { + CubeStatsReader pReader = new CubeStatsReader(pSegment, pSegment.getConfig()); + Map<Long, HLLCounter> pHLLMap = pReader.getCuboidRowEstimatesHLLOrigin(); + if (pHLLMap == null || pHLLMap.isEmpty()) { + logger.info("Cuboid Statistics for segment " + pSegment.getName() + " is not enabled."); + nSegment--; + continue; + } + Map<Long, Double> pSizeMap = pReader.getCuboidSizeMap(); + for (Long pCuboid : cuboidSet) { + HLLCounter pInnerHLL = pHLLMap.get(pCuboid); + Preconditions.checkNotNull(pInnerHLL, "statistics should exist for cuboid " + pCuboid + " of segment " + + pSegment.getCubeDesc().getName() + "[" + pSegment.getName() + "]"); + if (cuboidHLLMapMerged.get(pCuboid) != null) { + cuboidHLLMapMerged.get(pCuboid).merge(pInnerHLL); + } else { + cuboidHLLMapMerged.put(pCuboid, pInnerHLL); + } + + Double pSize = sizeMapMerged.get(pCuboid); + sizeMapMerged.put(pCuboid, pSize == null ? pSizeMap.get(pCuboid) : pSizeMap.get(pCuboid) + pSize); + } + } + + if (nSegment < 1) { + return; + } + for (Long pCuboid : cuboidSet) { + statisticsMerged.put(pCuboid, cuboidHLLMapMerged.get(pCuboid).getCountEstimate() / nSegment); + sizeMerged.put(pCuboid, sizeMapMerged.get(pCuboid) / nSegment); + } + } + + public static Map<Long, Long> readCuboidStatsFromSegment(Set<Long> cuboidIds, CubeSegment cubeSegment) + throws IOException { + if (cubeSegment == null) { + logger.warn("The cube segment can not be " + null); + return null; + } + + CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, cubeSegment.getConfig()); + if (cubeStatsReader.getCuboidRowEstimatesHLL() == null + || cubeStatsReader.getCuboidRowEstimatesHLL().isEmpty()) { + logger.info("Cuboid Statistics is not enabled."); + return null; + } + + Map<Long, Long> cuboidsWithStatsAll = cubeStatsReader.getCuboidRowEstimatesHLL(); + Map<Long, Long> cuboidsWithStats = Maps.newHashMapWithExpectedSize(cuboidIds.size()); + for (Long cuboid : cuboidIds) { + Long rowEstimate = cuboidsWithStatsAll.get(cuboid); + if (rowEstimate == null) { + logger.warn("Cannot get the row count stats for cuboid " + cuboid); + } else { + cuboidsWithStats.put(cuboid, rowEstimate); + } + } + return cuboidsWithStats; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e75770f6/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java new file mode 100644 index 0000000..cfac0e7 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.common; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StatisticsDecisionUtil { + protected static final Logger logger = LoggerFactory.getLogger(StatisticsDecisionUtil.class); + + public static void decideCubingAlgorithm(CubingJob cubingJob, CubeSegment seg) throws IOException { + CubeStatsReader cubeStats = new CubeStatsReader(seg, seg.getConfig()); + decideCubingAlgorithm(cubingJob, seg, cubeStats.getMapperOverlapRatioOfFirstBuild(), + cubeStats.getMapperNumberOfFirstBuild()); + } + + public static void decideCubingAlgorithm(CubingJob cubingJob, CubeSegment seg, double mapperOverlapRatio, + int mapperNumber) throws IOException { + KylinConfig kylinConf = seg.getConfig(); + String algPref = kylinConf.getCubeAlgorithm(); + CubingJob.AlgorithmEnum alg; + if (CubingJob.AlgorithmEnum.INMEM.name().equalsIgnoreCase(algPref)) { + alg = CubingJob.AlgorithmEnum.INMEM; + } else if (CubingJob.AlgorithmEnum.LAYER.name().equalsIgnoreCase(algPref)) { + alg = CubingJob.AlgorithmEnum.LAYER; + } else { + int memoryHungryMeasures = 0; + for (MeasureDesc measure : seg.getCubeDesc().getMeasures()) { + if (measure.getFunction().getMeasureType().isMemoryHungry()) { + logger.info("This cube has memory-hungry measure " + measure.getFunction().getExpression()); + memoryHungryMeasures++; + } + } + + if (memoryHungryMeasures > 0) { + alg = CubingJob.AlgorithmEnum.LAYER; + } else if ("random".equalsIgnoreCase(algPref)) { // for testing + alg = new Random().nextBoolean() ? CubingJob.AlgorithmEnum.INMEM : CubingJob.AlgorithmEnum.LAYER; + } else { // the default + int mapperNumLimit = kylinConf.getCubeAlgorithmAutoMapperLimit(); + double overlapThreshold = kylinConf.getCubeAlgorithmAutoThreshold(); + logger.info("mapperNumber for " + seg + " is " + mapperNumber + " and threshold is " + mapperNumLimit); + logger.info("mapperOverlapRatio for " + seg + " is " + mapperOverlapRatio + " and threshold is " + + overlapThreshold); + + // in-mem cubing is good when + // 1) the cluster has enough mapper slots to run in parallel + // 2) the mapper overlap ratio is small, meaning the shuffle of in-mem MR has advantage + alg = (mapperNumber <= mapperNumLimit && mapperOverlapRatio <= overlapThreshold)// + ? CubingJob.AlgorithmEnum.INMEM + : CubingJob.AlgorithmEnum.LAYER; + } + + } + logger.info("The cube algorithm for " + seg + " is " + alg); + + cubingJob.setAlgorithm(alg); + } + + public static void optimizeCubingPlan(CubeSegment segment) throws IOException { + CubeInstance cube = segment.getCubeInstance(); + List<CubeSegment> readySegments = cube.getSegments(SegmentStatusEnum.READY); + if (readySegments.size() == 0 + || (cube.getConfig().isCubePlannerEnabledForExistingCube() && readySegments.size() == 1 + && (readySegments.get(0).getDateRangeStart() == segment.getDateRangeStart()) + && (readySegments.get(0).getDateRangeEnd() == segment.getDateRangeEnd()))) { + logger.info("It's able to trigger cuboid planner algorithm."); + } else { + return; + } + + Map<Long, Long> recommendCuboidsWithStats = CuboidRecommenderUtil.getRecommendCuboidList(segment); + if (recommendCuboidsWithStats == null || recommendCuboidsWithStats.isEmpty()) { + return; + } + + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setCuboids(recommendCuboidsWithStats); + CubeManager.getInstance(cube.getConfig()).updateCube(cubeBuilder); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e75770f6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java index 7544188..5971b6c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java @@ -19,7 +19,6 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; -import java.util.Random; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -30,14 +29,12 @@ import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.CubingJob; -import org.apache.kylin.engine.mr.CubingJob.AlgorithmEnum; import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.engine.mr.common.CubeStatsReader; +import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; -import org.apache.kylin.metadata.model.MeasureDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,12 +68,15 @@ public class SaveStatisticsStep extends AbstractExecutable { // put the statistics to metadata store String statisticsFileName = newSegment.getStatisticsResourcePath(); rs.putResource(statisticsFileName, is, System.currentTimeMillis()); + + CubingJob cubingJob = (CubingJob) getManager() + .getJob(CubingExecutableUtil.getCubingJobId(this.getParams())); + StatisticsDecisionUtil.decideCubingAlgorithm(cubingJob, newSegment); + StatisticsDecisionUtil.optimizeCubingPlan(newSegment); } finally { IOUtils.closeStream(is); } - decideCubingAlgorithm(newSegment, kylinConf); - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); } catch (IOException e) { logger.error("fail to save cuboid statistics", e); @@ -84,47 +84,4 @@ public class SaveStatisticsStep extends AbstractExecutable { } } - private void decideCubingAlgorithm(CubeSegment seg, KylinConfig kylinConf) throws IOException { - String algPref = kylinConf.getCubeAlgorithm(); - AlgorithmEnum alg; - if (AlgorithmEnum.INMEM.name().equalsIgnoreCase(algPref)) { - alg = AlgorithmEnum.INMEM; - } else if (AlgorithmEnum.LAYER.name().equalsIgnoreCase(algPref)) { - alg = AlgorithmEnum.LAYER; - } else { - int memoryHungryMeasures = 0; - for (MeasureDesc measure : seg.getCubeDesc().getMeasures()) { - if (measure.getFunction().getMeasureType().isMemoryHungry()) { - logger.info("This cube has memory-hungry measure " + measure.getFunction().getExpression()); - memoryHungryMeasures++; - } - } - - if (memoryHungryMeasures > 0) { - alg = AlgorithmEnum.LAYER; - } else if ("random".equalsIgnoreCase(algPref)) { // for testing - alg = new Random().nextBoolean() ? AlgorithmEnum.INMEM : AlgorithmEnum.LAYER; - } else { // the default - CubeStatsReader cubeStats = new CubeStatsReader(seg, kylinConf); - int mapperNumber = cubeStats.getMapperNumberOfFirstBuild(); - int mapperNumLimit = kylinConf.getCubeAlgorithmAutoMapperLimit(); - double mapperOverlapRatio = cubeStats.getMapperOverlapRatioOfFirstBuild(); - double overlapThreshold = kylinConf.getCubeAlgorithmAutoThreshold(); - logger.info("mapperNumber for " + seg + " is " + mapperNumber + " and threshold is " + mapperNumLimit); - logger.info("mapperOverlapRatio for " + seg + " is " + mapperOverlapRatio + " and threshold is " + overlapThreshold); - - // in-mem cubing is good when - // 1) the cluster has enough mapper slots to run in parallel - // 2) the mapper overlap ratio is small, meaning the shuffle of in-mem MR has advantage - alg = (mapperNumber <= mapperNumLimit && mapperOverlapRatio <= overlapThreshold)// - ? AlgorithmEnum.INMEM : AlgorithmEnum.LAYER; - } - - } - logger.info("The cube algorithm for " + seg + " is " + alg); - - CubingJob cubingJob = (CubingJob) getManager().getJob(CubingExecutableUtil.getCubingJobId(this.getParams())); - cubingJob.setAlgorithm(alg); - } - }