APACHE-KYLIN-2802: Enable cube planner phase one

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e75770f6
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e75770f6
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e75770f6

Branch: refs/heads/yaho-cube-planner
Commit: e75770f656f04612ca4e131b226731e6663ccb07
Parents: 697f848
Author: Zhong <nju_y...@apache.org>
Authored: Tue Aug 22 17:58:37 2017 +0800
Committer: Zhong <nju_y...@apache.org>
Committed: Tue Aug 22 17:58:37 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  27 +++
 .../org/apache/kylin/common/util/JsonUtil.java  |   4 +
 .../org/apache/kylin/cube/CubeInstance.java     |  19 ++-
 .../java/org/apache/kylin/cube/CubeManager.java |   4 +
 .../java/org/apache/kylin/cube/CubeUpdate.java  |  11 ++
 .../cuboid/algorithm/CuboidRecommender.java     | 163 +++++++++++++++++++
 .../org/apache/kylin/cube/CubeInstanceTest.java |  57 +++++++
 .../src/test/resources/learn_kylin_cube2.json   |  84 ++++++++++
 .../kylin/engine/mr/common/CubeStatsReader.java |   4 +
 .../engine/mr/common/CuboidRecommenderUtil.java | 106 ++++++++++++
 .../kylin/engine/mr/common/CuboidStatsUtil.java | 135 +++++++++++++++
 .../mr/common/StatisticsDecisionUtil.java       | 110 +++++++++++++
 .../engine/mr/steps/SaveStatisticsStep.java     |  55 +------
 13 files changed, 722 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/e75770f6/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 6feb51a..d66d7ce 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1126,6 +1126,33 @@ abstract public class KylinConfigBase implements 
Serializable {
     }
 
     // 
============================================================================
+    // Cube Planner
+    // 
============================================================================
+    public boolean isCubePlannerEnabledForExistingCube() {
+        return 
Boolean.parseBoolean(getOptional("kylin.cube.cubeplanner.enabled-for-existing-cube",
 "false"));
+    }
+
+    public double getCubePlannerExpansionRateThreshold() {
+        return 
Double.parseDouble(getOptional("kylin.cube.cubeplanner.expansion-threshold", 
"15.0"));
+    }
+
+    public int getCubePlannerRecommendCuboidCacheMaxSize() {
+        return 
Integer.parseInt(getOptional("kylin.cube.cubeplanner.recommend-cache-max-size", 
"200"));
+    }
+
+    public long getCubePlannerMandatoryRollUpThreshold() {
+        return 
Long.parseLong(getOptional("kylin.cube.cubeplanner.mandatory-rollup-threshold", 
"1000"));
+    }
+
+    public int getCubePlannerAgreedyAlgorithmAutoThreshold() {
+        return 
Integer.parseInt(getOptional("kylin.cube.cubeplanner.algorithm-threshold-greedy",
 "10"));
+    }
+
+    public int getCubePlannerGeneticAlgorithmAutoThreshold() {
+        return 
Integer.parseInt(getOptional("kylin.cube.cubeplanner.algorithm-threshold-genetic",
 "23"));
+    }
+
+    // 
============================================================================
     // Metrics
     // 
============================================================================
     public boolean isKylinMetricsMonitorEnabled() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/e75770f6/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java 
b/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java
index f874b16..c30a1f2 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java
@@ -92,6 +92,10 @@ public class JsonUtil {
         return typeMapper.readValue(src, valueType);
     }
 
+    public static <T> T readValue(String content, TypeReference<T> 
valueTypeRef) throws IOException {
+        return mapper.readValue(content, valueTypeRef);
+    }
+
     public static void writeValueIndent(OutputStream out, Object value)
             throws IOException, JsonGenerationException, JsonMappingException {
         indentMapper.writeValue(out, value);

http://git-wip-us.apache.org/repos/asf/kylin/blob/e75770f6/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index 3f8bbbe..d217dcd 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -22,8 +22,6 @@ import static 
org.apache.kylin.cube.cuboid.CuboidModeEnum.CURRENT;
 import static org.apache.kylin.cube.cuboid.CuboidModeEnum.RECOMMEND;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -59,6 +57,7 @@ import 
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonManagedReference;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
 
@@ -359,21 +358,23 @@ public class CubeInstance extends RootPersistentEntity 
implements IRealization,
         }
     }
 
-    public HashMap<Long, Long> getCuboids() {
+    public Map<Long, Long> getCuboids() {
         if (cuboidBytes == null)
             return null;
         byte[] uncompressed;
         try {
             uncompressed = CompressionUtils.decompress(cuboidBytes);
             String str = new String(uncompressed, "UTF-8");
-            HashMap<Long, Long> cuboids = JsonUtil.readValue(str, 
HashMap.class);
+            TypeReference<Map<Long, Long>> typeRef = new 
TypeReference<Map<Long, Long>>() {
+            };
+            Map<Long, Long> cuboids = JsonUtil.readValue(str, typeRef);
             return cuboids.isEmpty() ? null : cuboids;
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
 
-    public void setCuboids(HashMap<Long, Long> cuboids) {
+    public void setCuboids(Map<Long, Long> cuboids) {
         if (cuboids == null)
             return;
         if (cuboids.isEmpty()) {
@@ -390,21 +391,23 @@ public class CubeInstance extends RootPersistentEntity 
implements IRealization,
         }
     }
 
-    public HashSet<Long> getCuboidsRecommend() {
+    public Set<Long> getCuboidsRecommend() {
         if (cuboidBytesRecommend == null)
             return null;
         byte[] uncompressed;
         try {
             uncompressed = CompressionUtils.decompress(cuboidBytesRecommend);
             String str = new String(uncompressed, "UTF-8");
-            HashSet<Long> cuboids = JsonUtil.readValue(str, HashSet.class);
+            TypeReference<Set<Long>> typeRef = new TypeReference<Set<Long>>() {
+            };
+            Set<Long> cuboids = JsonUtil.readValue(str, typeRef);
             return cuboids.isEmpty() ? null : cuboids;
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
 
-    public void setCuboidsRecommend(HashSet<Long> cuboids) {
+    public void setCuboidsRecommend(Set<Long> cuboids) {
         if (cuboids == null)
             return;
         if (cuboids.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/e75770f6/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 739400a..ec43fe3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -421,6 +421,10 @@ public class CubeManager implements IRealizationProvider {
             cube.setCost(update.getCost());
         }
 
+        if(update.getCuboids() != null){
+            cube.setCuboids(update.getCuboids());
+        }
+
         try {
             getStore().putResource(cube.getResourcePath(), cube, 
CUBE_SERIALIZER);
         } catch (IllegalStateException ise) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/e75770f6/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
index dffaa48..fae20dc 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.cube;
 
+import java.util.Map;
+
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 
 /**
@@ -31,6 +33,7 @@ public class CubeUpdate {
     private RealizationStatusEnum status;
     private String owner;
     private int cost = -1;
+    private Map<Long, Long> cuboids = null;
 
     public CubeUpdate(CubeInstance cubeInstance) {
         this.cubeInstance = cubeInstance;
@@ -98,4 +101,12 @@ public class CubeUpdate {
         this.cost = cost;
         return this;
     }
+
+    public Map<Long, Long> getCuboids() {
+        return cuboids;
+    }
+
+    public void setCuboids(Map<Long, Long> cuboids) {
+        this.cuboids = cuboids;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/e75770f6/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java
new file mode 100644
index 0000000..43b2318
--- /dev/null
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.cuboid.algorithm;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.cuboid.algorithm.generic.GeneticAlgorithm;
+import org.apache.kylin.cube.cuboid.algorithm.greedy.GreedyAlgorithm;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Maps;
+
+public class CuboidRecommender {
+    private static final Logger logger = 
LoggerFactory.getLogger(CuboidRecommender.class);
+
+    private static Cache<String, Map<Long, Long>> cuboidRecommendCache = 
CacheBuilder.newBuilder()
+            .removalListener(new RemovalListener<String, Map<Long, Long>>() {
+                @Override
+                public void onRemoval(RemovalNotification<String, Map<Long, 
Long>> notification) {
+                    logger.info("Dict with resource path " + 
notification.getKey() + " is removed due to "
+                            + notification.getCause());
+                }
+            
}).maximumSize(KylinConfig.getInstanceFromEnv().getCubePlannerRecommendCuboidCacheMaxSize())
+            .expireAfterWrite(1, TimeUnit.DAYS).build();
+
+    private class CuboidRecommenderSyncListener extends Broadcaster.Listener {
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            cuboidRecommendCache.invalidateAll();
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, 
Broadcaster.Event event, String cacheKey)
+                throws IOException {
+            cuboidRecommendCache.invalidate(cacheKey);
+        }
+    }
+
+    public CuboidRecommender() {
+        
Broadcaster.getInstance(KylinConfig.getInstanceFromEnv()).registerListener(new 
CuboidRecommenderSyncListener(),
+                "cube");
+    }
+
+    private static CuboidRecommender instance = new CuboidRecommender();
+
+    public static CuboidRecommender getInstance() {
+        return instance;
+    }
+
+    public Map<Long, Long> getRecommendCuboidList(final CuboidStats 
cuboidStats, final KylinConfig kylinConfig) {
+        if (cuboidStats == null) {
+            return null;
+        }
+        final String key = cuboidStats.getKey();
+        Map<Long, Long> results = cuboidRecommendCache.getIfPresent(key);
+        if (results == null) {
+            try {
+                results = cuboidRecommendCache.get(key, new Callable<Map<Long, 
Long>>() {
+                    @Override
+                    public Map<Long, Long> call() throws Exception {
+                        // recommending flag
+                        Map<Long, Long> emptyMap = Maps.newHashMap();
+                        cuboidRecommendCache.put(key, emptyMap);
+                        try {
+                            Map<Long, Long> recommendCuboid = 
getRecommendCuboidList(cuboidStats, kylinConfig,
+                                    true);
+
+                            if (recommendCuboid != null) {
+                                logger.info("Add recommend cuboids for " + key 
+ " to cache");
+                                cuboidRecommendCache.put(key, recommendCuboid);
+                            }
+
+                            return recommendCuboid;
+                        } catch (Exception e) {
+                            cuboidRecommendCache.invalidate(key);
+                            logger.error("Failed to get recommend cuboids for 
" + key + " in cache", e);
+                            throw e;
+                        }
+                    }
+                });
+            } catch (ExecutionException e) {
+                logger.error("Failed to get recommend cuboids for " + key);
+            }
+        }
+        return results;
+    }
+
+    public Map<Long, Long> getRecommendCuboidList(CuboidStats cuboidStats, 
KylinConfig kylinConf,
+            boolean ifForceRecommend) {
+        long Threshold1 = 1L << 
kylinConf.getCubePlannerAgreedyAlgorithmAutoThreshold();
+        long Threshold2 = 1L << 
kylinConf.getCubePlannerGeneticAlgorithmAutoThreshold();
+        if (Threshold1 >= Threshold2) {
+            logger.error("Invalid Cube Planner Algorithm configuration");
+            return null;
+        }
+
+        int allCuboidCount = cuboidStats.getAllCuboidsForMandatory().size()
+                + cuboidStats.getAllCuboidsForSelection().size();
+
+        BenefitPolicy benefitPolicy = new PBPUSCalculator(cuboidStats);
+        CuboidRecommendAlgorithm algorithm = null;
+
+        if (allCuboidCount <= Threshold2) {
+            algorithm = new GreedyAlgorithm(-1, benefitPolicy, cuboidStats);
+        } else {
+            algorithm = new GeneticAlgorithm(-1, benefitPolicy, cuboidStats);
+        }
+
+        long startTime = System.currentTimeMillis();
+        logger.info("Cube Planner Algorithm started at " + startTime);
+        List<Long> recommendCuboidList = 
algorithm.recommend(kylinConf.getCubePlannerExpansionRateThreshold());
+        logger.info("Cube Planner Algorithm ended at " + 
(System.currentTimeMillis() - startTime));
+
+        if (recommendCuboidList.size() < allCuboidCount) {
+            logger.info("Cube Planner Algorithm chooses " + 
recommendCuboidList.size()
+                    + " most effective cuboids to build among of all " + 
allCuboidCount + " cuboids.");
+        }
+
+        Map<Long, Long> recommendCuboidsWithStats = Maps.newLinkedHashMap();
+        for (Long cuboid : recommendCuboidList) {
+            if (cuboid.equals(cuboidStats.getBaseCuboid())) {
+                recommendCuboidsWithStats.put(cuboid, 
cuboidStats.getCuboidCount(cuboid));
+            } else if 
(cuboidStats.getAllCuboidsForSelection().contains(cuboid)) {
+                recommendCuboidsWithStats.put(cuboid, 
cuboidStats.getCuboidCount(cuboid));
+            } else {
+                recommendCuboidsWithStats.put(cuboid, -1L);
+            }
+        }
+
+        if (!ifForceRecommend && allCuboidCount <= Threshold1) {
+            return null;
+        }
+        return recommendCuboidsWithStats;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e75770f6/core-cube/src/test/java/org/apache/kylin/cube/CubeInstanceTest.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/test/java/org/apache/kylin/cube/CubeInstanceTest.java 
b/core-cube/src/test/java/org/apache/kylin/cube/CubeInstanceTest.java
new file mode 100644
index 0000000..0af5f98
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeInstanceTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube;
+
+import java.io.DataInputStream;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.cube.cuboid.TreeCuboidScheduler;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class CubeInstanceTest {
+
+    private CubeInstance cubeInstance;
+
+    @Before
+    public void setUp() throws Exception {
+        InputStream fileInputStream = new 
FileInputStream("src/test/resources/learn_kylin_cube2.json");
+        JsonSerializer<CubeInstance> jsonSerializer = new 
JsonSerializer<>(CubeInstance.class);
+        cubeInstance = jsonSerializer.deserialize(new 
DataInputStream(fileInputStream));
+    }
+
+    @Test
+    public void getTreeCuboidSchedulerTest() {
+        Map<Long, Long> cuboidsWithRowCnt = cubeInstance.getCuboids();
+        
TreeCuboidScheduler.CuboidTree.createFromCuboids(Lists.newArrayList(cuboidsWithRowCnt.keySet()),
+                new 
TreeCuboidScheduler.CuboidCostComparator(cuboidsWithRowCnt));
+    }
+
+    public void printMap(Map<Long, Long> map) {
+        System.out.println("size: " + map.size());
+        for (Map.Entry<Long, Long> entry : map.entrySet()) {
+            System.out.println(entry.getKey() + ":" + entry.getValue());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e75770f6/core-cube/src/test/resources/learn_kylin_cube2.json
----------------------------------------------------------------------
diff --git a/core-cube/src/test/resources/learn_kylin_cube2.json 
b/core-cube/src/test/resources/learn_kylin_cube2.json
new file mode 100644
index 0000000..1f50902
--- /dev/null
+++ b/core-cube/src/test/resources/learn_kylin_cube2.json
@@ -0,0 +1,84 @@
+{
+  "uuid" : "bd66b324-4b0d-4fc7-a2ef-2b9ddced160d",
+  "last_modified" : 1503388415029,
+  "version" : "2.1.0",
+  "name" : "learn_kylin_cube2",
+  "owner" : "ADMIN",
+  "descriptor" : "learn_kylin_cube2",
+  "cost" : 50,
+  "status" : "READY",
+  "segments" : [ {
+    "uuid" : "4d017947-1a63-47a9-b31d-356219612cf6",
+    "name" : "20100101000000_20150101000000",
+    "storage_location_identifier" : "KYLIN_FKF6CWSIZC",
+    "date_range_start" : 1262304000000,
+    "date_range_end" : 1420070400000,
+    "source_offset_start" : 0,
+    "source_offset_end" : 0,
+    "status" : "READY",
+    "size_kb" : 22153,
+    "input_records" : 10000,
+    "input_records_size" : 984072,
+    "last_build_time" : 1503388048037,
+    "last_build_job_id" : "06c5bfd9-0443-4c28-810b-bcbd37fde458",
+    "create_time_utc" : 1503387574466,
+    "cuboid_shard_nums" : { },
+    "total_shards" : 1,
+    "blackout_cuboids" : [ ],
+    "binary_signature" : null,
+    "dictionaries" : {
+      "KYLIN_SALES.LSTG_SITE_ID" : 
"/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/SITE_ID/d1dbea6a-4929-43ef-bf2a-cf356d9de9eb.dict",
+      "KYLIN_SALES.LEAF_CATEG_ID" : 
"/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/LEAF_CATEG_ID/60f6c00a-2137-4d6f-90e6-5433d30e326d.dict",
+      "KYLIN_CATEGORY_GROUPINGS.META_CATEG_NAME" : 
"/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/META_CATEG_NAME/e375c3b2-bebf-4f9a-95f3-abb76a7bdc80.dict",
+      "KYLIN_SALES.SLR_SEGMENT_CD" : 
"/dict/DEFAULT.KYLIN_SALES/SLR_SEGMENT_CD/ddb1659c-85cc-4d4f-8aae-40c6618f61cc.dict",
+      "KYLIN_CATEGORY_GROUPINGS.CATEG_LVL3_NAME" : 
"/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/CATEG_LVL3_NAME/7bbcdb05-4cf9-4d31-942c-828c5bbcaab8.dict",
+      "KYLIN_SALES.LSTG_FORMAT_NAME" : 
"/dict/DEFAULT.KYLIN_SALES/LSTG_FORMAT_NAME/35669f98-38f2-4e67-9547-1b0b446a1675.dict",
+      "KYLIN_CATEGORY_GROUPINGS.CATEG_LVL2_NAME" : 
"/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/CATEG_LVL2_NAME/a83a92e5-7711-4a34-bff8-ab3da625e996.dict"
+    },
+    "snapshots" : {
+      "DEFAULT.KYLIN_CATEGORY_GROUPINGS" : 
"/table_snapshot/DEFAULT.KYLIN_CATEGORY_GROUPINGS/90b8251c-96c9-45a6-a4db-9fe49e8a0581.snapshot",
+      "DEFAULT.KYLIN_CAL_DT" : 
"/table_snapshot/DEFAULT.KYLIN_CAL_DT/35abb8cf-4e70-428e-adef-457e5a085aa0.snapshot"
+    },
+    "rowkey_stats" : [ [ "KYLIN_SALES.LSTG_FORMAT_NAME", 5, 1 ], [ 
"KYLIN_SALES.SLR_SEGMENT_CD", 8, 1 ], [ "KYLIN_SALES.LEAF_CATEG_ID", 134, 1 ], 
[ "KYLIN_SALES.LSTG_SITE_ID", 8, 1 ], [ 
"KYLIN_CATEGORY_GROUPINGS.META_CATEG_NAME", 44, 1 ], [ 
"KYLIN_CATEGORY_GROUPINGS.CATEG_LVL2_NAME", 94, 1 ], [ 
"KYLIN_CATEGORY_GROUPINGS.CATEG_LVL3_NAME", 127, 1 ] ]
+  }, {
+    "uuid" : "4cb8c3e7-9624-44c4-b812-041cb5604e50",
+    "name" : "20100101000000_20150101000000",
+    "storage_location_identifier" : "KYLIN_VEXWSEB5Y7",
+    "date_range_start" : 1262304000000,
+    "date_range_end" : 1420070400000,
+    "source_offset_start" : 0,
+    "source_offset_end" : 0,
+    "status" : "NEW",
+    "size_kb" : 0,
+    "input_records" : 0,
+    "input_records_size" : 0,
+    "last_build_time" : 0,
+    "last_build_job_id" : null,
+    "create_time_utc" : 1503388305176,
+    "cuboid_shard_nums" : { },
+    "total_shards" : 0,
+    "blackout_cuboids" : [ ],
+    "binary_signature" : null,
+    "dictionaries" : {
+      "KYLIN_SALES.LSTG_SITE_ID" : 
"/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/SITE_ID/d1dbea6a-4929-43ef-bf2a-cf356d9de9eb.dict",
+      "KYLIN_SALES.LEAF_CATEG_ID" : 
"/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/LEAF_CATEG_ID/60f6c00a-2137-4d6f-90e6-5433d30e326d.dict",
+      "KYLIN_CATEGORY_GROUPINGS.META_CATEG_NAME" : 
"/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/META_CATEG_NAME/e375c3b2-bebf-4f9a-95f3-abb76a7bdc80.dict",
+      "KYLIN_SALES.SLR_SEGMENT_CD" : 
"/dict/DEFAULT.KYLIN_SALES/SLR_SEGMENT_CD/ddb1659c-85cc-4d4f-8aae-40c6618f61cc.dict",
+      "KYLIN_CATEGORY_GROUPINGS.CATEG_LVL3_NAME" : 
"/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/CATEG_LVL3_NAME/7bbcdb05-4cf9-4d31-942c-828c5bbcaab8.dict",
+      "KYLIN_SALES.LSTG_FORMAT_NAME" : 
"/dict/DEFAULT.KYLIN_SALES/LSTG_FORMAT_NAME/35669f98-38f2-4e67-9547-1b0b446a1675.dict",
+      "KYLIN_CATEGORY_GROUPINGS.CATEG_LVL2_NAME" : 
"/dict/DEFAULT.KYLIN_CATEGORY_GROUPINGS/CATEG_LVL2_NAME/a83a92e5-7711-4a34-bff8-ab3da625e996.dict"
+    },
+    "snapshots" : {
+      "DEFAULT.KYLIN_CATEGORY_GROUPINGS" : 
"/table_snapshot/DEFAULT.KYLIN_CATEGORY_GROUPINGS/90b8251c-96c9-45a6-a4db-9fe49e8a0581.snapshot",
+      "DEFAULT.KYLIN_CAL_DT" : 
"/table_snapshot/DEFAULT.KYLIN_CAL_DT/35abb8cf-4e70-428e-adef-457e5a085aa0.snapshot"
+    },
+    "rowkey_stats" : [ [ "KYLIN_SALES.LSTG_FORMAT_NAME", 5, 1 ], [ 
"KYLIN_SALES.SLR_SEGMENT_CD", 8, 1 ], [ "KYLIN_SALES.LEAF_CATEG_ID", 134, 1 ], 
[ "KYLIN_SALES.LSTG_SITE_ID", 8, 1 ], [ 
"KYLIN_CATEGORY_GROUPINGS.META_CATEG_NAME", 44, 1 ], [ 
"KYLIN_CATEGORY_GROUPINGS.CATEG_LVL2_NAME", 94, 1 ], [ 
"KYLIN_CATEGORY_GROUPINGS.CATEG_LVL3_NAME", 127, 1 ] ]
+  } ],
+  "create_time_utc" : 1502959568985,
+  "cuboidBytes" : 
"eAEdkTmWBTEIA+/iuAM24+Vq8+buv+TINEhCqP9GzDnuOebf6Bp3fmOPu76RMe7m8XG96huL76BRNm4yPz1uGS/0PhSdIG3lN9wQcg09llB9VEo31UyoPg2RMNQqDiQoxdOgwkEJlFrowiCEMzemCDMu8LnRXeBiaisG3yuJ2dm05uG2nG8p454yigDlKsgyYjqOzjPkBizNmSUyW6tc15hWSTgo3MGnp+JAYqd6R8zc2IyX3So5aJC1zgMAPTYpQwEdE9adgDkEqYXZ7hCJhYktJ1t/4glhb9NCEsXzg0kldceG0YtAXb/p5NK4KPdTLsVmkf8/QfNdrA==",
+  "cuboid_bytes_recommend" : null,
+  "last_optimized" : 0,
+  "size_kb" : 22153,
+  "input_records_count" : 10000,
+  "input_records_size" : 984072
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/e75770f6/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index 417697d..8f1da6e 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -140,6 +140,10 @@ public class CubeStatsReader {
         return tempFile;
     }
 
+    public Map<Long, HLLCounter> getCuboidRowEstimatesHLLOrigin() {
+        return this.cuboidRowEstimatesHLL;
+    }
+
     public Map<Long, Long> getCuboidRowEstimatesHLL() {
         return getCuboidRowCountMapFromSampling(cuboidRowEstimatesHLL, 
samplingPercentage);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/e75770f6/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
new file mode 100644
index 0000000..a0b20fa
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.algorithm.CuboidRecommender;
+import org.apache.kylin.cube.cuboid.algorithm.CuboidStats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CuboidRecommenderUtil {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(CuboidRecommenderUtil.class);
+
+    /** Trigger cube planner phase one */
+    public static Map<Long, Long> getRecommendCuboidList(CubeSegment segment) 
throws IOException {
+        if (segment == null) {
+            return null;
+        }
+
+        CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, 
segment.getConfig());
+        if (cubeStatsReader.getCuboidRowEstimatesHLL() == null
+                || cubeStatsReader.getCuboidRowEstimatesHLL().isEmpty()) {
+            logger.info("Cuboid Statistics is not enabled.");
+            return null;
+        }
+        long baseCuboid = segment.getCuboidScheduler().getBaseCuboidId();
+        if (cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == null
+                || cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) 
== 0L) {
+            logger.info("Base cuboid count in cuboid statistics is 0.");
+            return null;
+        }
+
+        String key = segment.getCubeInstance().getName();
+        CuboidStats cuboidStats = new CuboidStats.Builder(key, baseCuboid, 
cubeStatsReader.getCuboidRowEstimatesHLL(),
+                cubeStatsReader.getCuboidSizeMap()).build();
+        return 
CuboidRecommender.getInstance().getRecommendCuboidList(cuboidStats, 
segment.getConfig(), false);
+    }
+
+    /** Trigger cube planner phase two for optimization */
+    public static Map<Long, Long> getRecommendCuboidList(CubeInstance cube, 
Map<Long, Long> hitFrequencyMap,
+            Map<Long, Map<Long, Long>> rollingUpCountSourceMap) throws 
IOException {
+
+        Pair<Map<Long, Long>, Map<Long, Double>> statsPair = CuboidStatsUtil
+                
.readCuboidStatsAndSizeFromCube(cube.getCuboidScheduler().getAllCuboidIds(), 
cube);
+
+        String key = cube.getName();
+        long baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
+        CuboidStats cuboidStats = new CuboidStats.Builder(key, baseCuboid, 
statsPair.getFirst(), statsPair.getSecond())
+                
.setHitFrequencyMap(hitFrequencyMap).setRollingUpCountSourceMap(rollingUpCountSourceMap,
+                        
cube.getConfig().getCubePlannerMandatoryRollUpThreshold())
+                .build();
+        return 
CuboidRecommender.getInstance().getRecommendCuboidList(cuboidStats, 
cube.getConfig());
+    }
+
+    /** For future segment level recommend */
+    public static Map<Long, Long> getRecommendCuboidList(CubeSegment segment, 
Map<Long, Long> hitFrequencyMap,
+            Map<Long, Map<Long, Long>> rollingUpCountSourceMap, boolean 
ifForceRecommend) throws IOException {
+        if (segment == null) {
+            return null;
+        }
+
+        CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, 
segment.getConfig());
+        if (cubeStatsReader.getCuboidRowEstimatesHLL() == null
+                || cubeStatsReader.getCuboidRowEstimatesHLL().isEmpty()) {
+            logger.info("Cuboid Statistics is not enabled.");
+            return null;
+        }
+        long baseCuboid = segment.getCuboidScheduler().getBaseCuboidId();
+        if (cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == null
+                || cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) 
== 0L) {
+            logger.info("Base cuboid count in cuboid statistics is 0.");
+            return null;
+        }
+
+        String key = segment.getCubeInstance().getName() + "-" + 
segment.getName();
+        CuboidStats cuboidStats = new CuboidStats.Builder(key, baseCuboid, 
cubeStatsReader.getCuboidRowEstimatesHLL(),
+                
cubeStatsReader.getCuboidSizeMap()).setHitFrequencyMap(hitFrequencyMap)
+                        .setRollingUpCountSourceMap(rollingUpCountSourceMap,
+                                
segment.getConfig().getCubePlannerMandatoryRollUpThreshold())
+                        .build();
+        return 
CuboidRecommender.getInstance().getRecommendCuboidList(cuboidStats, 
segment.getConfig(),
+                ifForceRecommend);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e75770f6/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
new file mode 100644
index 0000000..5bb4179
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+public class CuboidStatsUtil {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(CuboidStatsUtil.class);
+
+    public static Map<Long, Long> readCuboidStatsFromCube(Set<Long> cuboidIds, 
CubeInstance cubeInstance)
+            throws IOException {
+        Map<Long, Long> statisticsMerged = 
readCuboidStatsAndSizeFromCube(cuboidIds, cubeInstance).getFirst();
+        return statisticsMerged.isEmpty() ? null : statisticsMerged;
+    }
+
+    public static Pair<Map<Long, Long>, Map<Long, Double>> 
readCuboidStatsAndSizeFromCube(Set<Long> cuboidIds,
+            CubeInstance cube) throws IOException {
+        Preconditions.checkNotNull(cuboidIds, "The cuboid set can not be 
null");
+        Preconditions.checkNotNull(cube, "The cube instance can not be null");
+
+        List<CubeSegment> segmentList = 
cube.getSegments(SegmentStatusEnum.READY);
+        Map<Long, Long> statisticsMerged = 
Maps.newHashMapWithExpectedSize(cuboidIds.size());
+        Map<Long, Double> sizeMerged = 
Maps.newHashMapWithExpectedSize(cuboidIds.size());
+        readCuboidStatsFromSegments(cuboidIds, segmentList, statisticsMerged, 
sizeMerged);
+        return new Pair<>(statisticsMerged, sizeMerged);
+    }
+
+    public static Map<Long, Long> readCuboidStatsFromSegments(Set<Long> 
cuboidIds, List<CubeSegment> segmentList)
+            throws IOException {
+        Map<Long, Long> statisticsMerged = 
Maps.newHashMapWithExpectedSize(cuboidIds.size());
+        readCuboidStatsFromSegments(cuboidIds, segmentList, statisticsMerged,
+                Maps.<Long, Double> 
newHashMapWithExpectedSize(cuboidIds.size()));
+        return statisticsMerged.isEmpty() ? null : statisticsMerged;
+    }
+
+    private static void readCuboidStatsFromSegments(Set<Long> cuboidSet, 
List<CubeSegment> segmentList,
+            final Map<Long, Long> statisticsMerged, final Map<Long, Double> 
sizeMerged) throws IOException {
+        if (segmentList == null || segmentList.isEmpty()) {
+            return;
+        }
+        int nSegment = segmentList.size();
+
+        Map<Long, HLLCounter> cuboidHLLMapMerged = 
Maps.newHashMapWithExpectedSize(cuboidSet.size());
+        Map<Long, Double> sizeMapMerged = 
Maps.newHashMapWithExpectedSize(cuboidSet.size());
+        for (CubeSegment pSegment : segmentList) {
+            CubeStatsReader pReader = new CubeStatsReader(pSegment, 
pSegment.getConfig());
+            Map<Long, HLLCounter> pHLLMap = 
pReader.getCuboidRowEstimatesHLLOrigin();
+            if (pHLLMap == null || pHLLMap.isEmpty()) {
+                logger.info("Cuboid Statistics for segment " + 
pSegment.getName() + " is not enabled.");
+                nSegment--;
+                continue;
+            }
+            Map<Long, Double> pSizeMap = pReader.getCuboidSizeMap();
+            for (Long pCuboid : cuboidSet) {
+                HLLCounter pInnerHLL = pHLLMap.get(pCuboid);
+                Preconditions.checkNotNull(pInnerHLL, "statistics should exist 
for cuboid " + pCuboid + " of segment "
+                        + pSegment.getCubeDesc().getName() + "[" + 
pSegment.getName() + "]");
+                if (cuboidHLLMapMerged.get(pCuboid) != null) {
+                    cuboidHLLMapMerged.get(pCuboid).merge(pInnerHLL);
+                } else {
+                    cuboidHLLMapMerged.put(pCuboid, pInnerHLL);
+                }
+
+                Double pSize = sizeMapMerged.get(pCuboid);
+                sizeMapMerged.put(pCuboid, pSize == null ? 
pSizeMap.get(pCuboid) : pSizeMap.get(pCuboid) + pSize);
+            }
+        }
+
+        if (nSegment < 1) {
+            return;
+        }
+        for (Long pCuboid : cuboidSet) {
+            statisticsMerged.put(pCuboid, 
cuboidHLLMapMerged.get(pCuboid).getCountEstimate() / nSegment);
+            sizeMerged.put(pCuboid, sizeMapMerged.get(pCuboid) / nSegment);
+        }
+    }
+
+    public static Map<Long, Long> readCuboidStatsFromSegment(Set<Long> 
cuboidIds, CubeSegment cubeSegment)
+            throws IOException {
+        if (cubeSegment == null) {
+            logger.warn("The cube segment can not be " + null);
+            return null;
+        }
+
+        CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, 
cubeSegment.getConfig());
+        if (cubeStatsReader.getCuboidRowEstimatesHLL() == null
+                || cubeStatsReader.getCuboidRowEstimatesHLL().isEmpty()) {
+            logger.info("Cuboid Statistics is not enabled.");
+            return null;
+        }
+
+        Map<Long, Long> cuboidsWithStatsAll = 
cubeStatsReader.getCuboidRowEstimatesHLL();
+        Map<Long, Long> cuboidsWithStats = 
Maps.newHashMapWithExpectedSize(cuboidIds.size());
+        for (Long cuboid : cuboidIds) {
+            Long rowEstimate = cuboidsWithStatsAll.get(cuboid);
+            if (rowEstimate == null) {
+                logger.warn("Cannot get the row count stats for cuboid " + 
cuboid);
+            } else {
+                cuboidsWithStats.put(cuboid, rowEstimate);
+            }
+        }
+        return cuboidsWithStats;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e75770f6/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
new file mode 100644
index 0000000..cfac0e7
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StatisticsDecisionUtil {
+    protected static final Logger logger = 
LoggerFactory.getLogger(StatisticsDecisionUtil.class);
+
+    public static void decideCubingAlgorithm(CubingJob cubingJob, CubeSegment 
seg) throws IOException {
+        CubeStatsReader cubeStats = new CubeStatsReader(seg, seg.getConfig());
+        decideCubingAlgorithm(cubingJob, seg, 
cubeStats.getMapperOverlapRatioOfFirstBuild(),
+                cubeStats.getMapperNumberOfFirstBuild());
+    }
+
+    public static void decideCubingAlgorithm(CubingJob cubingJob, CubeSegment 
seg, double mapperOverlapRatio,
+            int mapperNumber) throws IOException {
+        KylinConfig kylinConf = seg.getConfig();
+        String algPref = kylinConf.getCubeAlgorithm();
+        CubingJob.AlgorithmEnum alg;
+        if (CubingJob.AlgorithmEnum.INMEM.name().equalsIgnoreCase(algPref)) {
+            alg = CubingJob.AlgorithmEnum.INMEM;
+        } else if 
(CubingJob.AlgorithmEnum.LAYER.name().equalsIgnoreCase(algPref)) {
+            alg = CubingJob.AlgorithmEnum.LAYER;
+        } else {
+            int memoryHungryMeasures = 0;
+            for (MeasureDesc measure : seg.getCubeDesc().getMeasures()) {
+                if (measure.getFunction().getMeasureType().isMemoryHungry()) {
+                    logger.info("This cube has memory-hungry measure " + 
measure.getFunction().getExpression());
+                    memoryHungryMeasures++;
+                }
+            }
+
+            if (memoryHungryMeasures > 0) {
+                alg = CubingJob.AlgorithmEnum.LAYER;
+            } else if ("random".equalsIgnoreCase(algPref)) { // for testing
+                alg = new Random().nextBoolean() ? 
CubingJob.AlgorithmEnum.INMEM : CubingJob.AlgorithmEnum.LAYER;
+            } else { // the default
+                int mapperNumLimit = 
kylinConf.getCubeAlgorithmAutoMapperLimit();
+                double overlapThreshold = 
kylinConf.getCubeAlgorithmAutoThreshold();
+                logger.info("mapperNumber for " + seg + " is " + mapperNumber 
+ " and threshold is " + mapperNumLimit);
+                logger.info("mapperOverlapRatio for " + seg + " is " + 
mapperOverlapRatio + " and threshold is "
+                        + overlapThreshold);
+
+                // in-mem cubing is good when
+                // 1) the cluster has enough mapper slots to run in parallel
+                // 2) the mapper overlap ratio is small, meaning the shuffle 
of in-mem MR has advantage
+                alg = (mapperNumber <= mapperNumLimit && mapperOverlapRatio <= 
overlapThreshold)//
+                        ? CubingJob.AlgorithmEnum.INMEM
+                        : CubingJob.AlgorithmEnum.LAYER;
+            }
+
+        }
+        logger.info("The cube algorithm for " + seg + " is " + alg);
+
+        cubingJob.setAlgorithm(alg);
+    }
+
+    public static void optimizeCubingPlan(CubeSegment segment) throws 
IOException {
+        CubeInstance cube = segment.getCubeInstance();
+        List<CubeSegment> readySegments = 
cube.getSegments(SegmentStatusEnum.READY);
+        if (readySegments.size() == 0
+                || (cube.getConfig().isCubePlannerEnabledForExistingCube() && 
readySegments.size() == 1
+                        && (readySegments.get(0).getDateRangeStart() == 
segment.getDateRangeStart())
+                        && (readySegments.get(0).getDateRangeEnd() == 
segment.getDateRangeEnd()))) {
+            logger.info("It's able to trigger cuboid planner algorithm.");
+        } else {
+            return;
+        }
+
+        Map<Long, Long> recommendCuboidsWithStats = 
CuboidRecommenderUtil.getRecommendCuboidList(segment);
+        if (recommendCuboidsWithStats == null || 
recommendCuboidsWithStats.isEmpty()) {
+            return;
+        }
+
+        CubeUpdate cubeBuilder = new CubeUpdate(cube);
+        cubeBuilder.setCuboids(recommendCuboidsWithStats);
+        CubeManager.getInstance(cube.getConfig()).updateCube(cubeBuilder);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e75770f6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
index 7544188..5971b6c 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
@@ -19,7 +19,6 @@
 package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
-import java.util.Random;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -30,14 +29,12 @@ import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.CubingJob.AlgorithmEnum;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.model.MeasureDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,12 +68,15 @@ public class SaveStatisticsStep extends AbstractExecutable {
                 // put the statistics to metadata store
                 String statisticsFileName = 
newSegment.getStatisticsResourcePath();
                 rs.putResource(statisticsFileName, is, 
System.currentTimeMillis());
+
+                CubingJob cubingJob = (CubingJob) getManager()
+                        
.getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
+                StatisticsDecisionUtil.decideCubingAlgorithm(cubingJob, 
newSegment);
+                StatisticsDecisionUtil.optimizeCubingPlan(newSegment);
             } finally {
                 IOUtils.closeStream(is);
             }
 
-            decideCubingAlgorithm(newSegment, kylinConf);
-
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
         } catch (IOException e) {
             logger.error("fail to save cuboid statistics", e);
@@ -84,47 +84,4 @@ public class SaveStatisticsStep extends AbstractExecutable {
         }
     }
 
-    private void decideCubingAlgorithm(CubeSegment seg, KylinConfig kylinConf) 
throws IOException {
-        String algPref = kylinConf.getCubeAlgorithm();
-        AlgorithmEnum alg;
-        if (AlgorithmEnum.INMEM.name().equalsIgnoreCase(algPref)) {
-            alg = AlgorithmEnum.INMEM;
-        } else if (AlgorithmEnum.LAYER.name().equalsIgnoreCase(algPref)) {
-            alg = AlgorithmEnum.LAYER;
-        } else {
-            int memoryHungryMeasures = 0;
-            for (MeasureDesc measure : seg.getCubeDesc().getMeasures()) {
-                if (measure.getFunction().getMeasureType().isMemoryHungry()) {
-                    logger.info("This cube has memory-hungry measure " + 
measure.getFunction().getExpression());
-                    memoryHungryMeasures++;
-                }
-            }
-
-            if (memoryHungryMeasures > 0) {
-                alg = AlgorithmEnum.LAYER;
-            } else if ("random".equalsIgnoreCase(algPref)) { // for testing
-                alg = new Random().nextBoolean() ? AlgorithmEnum.INMEM : 
AlgorithmEnum.LAYER;
-            } else { // the default
-                CubeStatsReader cubeStats = new CubeStatsReader(seg, 
kylinConf);
-                int mapperNumber = cubeStats.getMapperNumberOfFirstBuild();
-                int mapperNumLimit = 
kylinConf.getCubeAlgorithmAutoMapperLimit();
-                double mapperOverlapRatio = 
cubeStats.getMapperOverlapRatioOfFirstBuild();
-                double overlapThreshold = 
kylinConf.getCubeAlgorithmAutoThreshold();
-                logger.info("mapperNumber for " + seg + " is " + mapperNumber 
+ " and threshold is " + mapperNumLimit);
-                logger.info("mapperOverlapRatio for " + seg + " is " + 
mapperOverlapRatio + " and threshold is " + overlapThreshold);
-
-                // in-mem cubing is good when
-                // 1) the cluster has enough mapper slots to run in parallel
-                // 2) the mapper overlap ratio is small, meaning the shuffle 
of in-mem MR has advantage
-                alg = (mapperNumber <= mapperNumLimit && mapperOverlapRatio <= 
overlapThreshold)//
-                        ? AlgorithmEnum.INMEM : AlgorithmEnum.LAYER;
-            }
-
-        }
-        logger.info("The cube algorithm for " + seg + " is " + alg);
-
-        CubingJob cubingJob = (CubingJob) 
getManager().getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
-        cubingJob.setAlgorithm(alg);
-    }
-
 }

Reply via email to