APACHE-KYLIN-2866: Enlarge the reducer number for hyperloglog statistics 
calculation at step FactDistinctColumnsJob

Signed-off-by: Zhong <[email protected]>
Signed-off-by: lidongsjtu <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/377ec491
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/377ec491
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/377ec491

Branch: refs/heads/master
Commit: 377ec491c66e36a217012547fa5ea9b00ae361cc
Parents: a8f35cf
Author: Wang Ken <[email protected]>
Authored: Wed Sep 13 11:36:02 2017 +0800
Committer: lidongsjtu <[email protected]>
Committed: Wed Dec 20 23:20:11 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   8 ++
 .../apache/kylin/common/util/HadoopUtil.java    |  19 ++++
 .../mr/steps/FactDistinctColumnPartitioner.java |  38 ++++++-
 .../engine/mr/steps/FactDistinctColumnsJob.java |  20 +++-
 .../mr/steps/FactDistinctColumnsReducer.java    |  55 ++++++----
 .../engine/mr/steps/SaveStatisticsStep.java     | 107 ++++++++++++++++++-
 6 files changed, 215 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/377ec491/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 524b1d4..23a2120 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1040,6 +1040,14 @@ abstract public class KylinConfigBase implements 
Serializable {
         return 
Integer.parseInt(getOptional("kylin.engine.mr.cuboid-number-per-stats-calculator",
 "100"));
     }
 
+    public int getFactDistinctJobPerReducerHLLCuboidNumber() {
+        return 
Integer.parseInt(getOptional("kylin.engine.mr.fact-distinct-per-reducer-hll-cuboid-number",
 "100"));
+    }
+
+    public int getFactDistinctJobHLLMaxReducerNumber() {
+        return 
Integer.parseInt(getOptional("kylin.engine.mr.fact-distinct-hll-max-reducer-number",
 "50"));
+    }
+
     //UHC: ultra high cardinality columns, contain the ShardByColumns and the 
GlobalDictionaryColumns
     public int getUHCReducerCount() {
         return 
Integer.parseInt(getOptional("kylin.engine.mr.uhc-reducer-count", "5"));

http://git-wip-us.apache.org/repos/asf/kylin/blob/377ec491/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java 
b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index f242515..cafcaf2 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -160,4 +160,23 @@ public class HadoopUtil {
             return null;
         }
     }
+
+    public static Path[] getFilterPath(FileSystem fs, Path baseDir, final 
String filter) throws IOException {
+        if (fs.exists(baseDir) == false) {
+            return null;
+        }
+
+        FileStatus[] fileStatus = fs.listStatus(baseDir, new PathFilter() {
+            @Override
+            public boolean accept(Path path) {
+                return path.getName().startsWith(filter);
+            }
+        });
+
+        Path[] result = new Path[fileStatus.length];
+        for (int i = 0; i < fileStatus.length; i++) {
+            result[i] = fileStatus[i].getPath();
+        }
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/377ec491/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
index 5fcfe42..7ac5d02 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
@@ -18,25 +18,55 @@
 
 package org.apache.kylin.engine.mr.steps;
 
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  */
-public class FactDistinctColumnPartitioner extends 
Partitioner<SelfDefineSortableKey, Text> {
+public class FactDistinctColumnPartitioner extends 
Partitioner<SelfDefineSortableKey, Text> implements Configurable {
+    private static final Logger logger = 
LoggerFactory.getLogger(FactDistinctColumnPartitioner.class);
+
+    public static final String HLL_SHARD_BASE_PROPERTY_NAME = 
"mapreduce.partition.factdistinctcolumnpartitioner.hll.shard.base";
+
+    public static void setHLLShard(Configuration conf, int hllShardBase) {
+        conf.setInt(HLL_SHARD_BASE_PROPERTY_NAME, hllShardBase);
+    }
+
+    private Configuration conf;
+    private int hllShardBase = 1;
 
     @Override
     public int getPartition(SelfDefineSortableKey skey, Text value, int 
numReduceTasks) {
         Text key = skey.getText();
         if (key.getBytes()[0] == FactDistinctColumnsMapper.MARK_FOR_HLL) {
-            // the last reducer is for merging hll
-            return numReduceTasks - 1;
+            // the last $hllShard reducers are for merging hll
+            Long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);
+            int shard = cuboidId.hashCode() % hllShardBase;
+            if (shard < 0) {
+                shard += hllShardBase;
+            }
+            return numReduceTasks - shard - 1;
         } else if (key.getBytes()[0] == 
FactDistinctColumnsMapper.MARK_FOR_PARTITION_COL) {
             // the last but one reducer is for partition col
-            return numReduceTasks - 2;
+            return numReduceTasks - hllShardBase - 1;
         } else {
             return BytesUtil.readUnsigned(key.getBytes(), 0, 1);
         }
     }
+
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+        hllShardBase = conf.getInt(HLL_SHARD_BASE_PROPERTY_NAME, 1);
+        logger.info("shard base for hll is " + hllShardBase);
+    }
+
+    public Configuration getConf() {
+        return conf;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/377ec491/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index 08dadc9..dee384f 100755
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -117,7 +117,7 @@ public class FactDistinctColumnsJob extends 
AbstractHadoopJob {
             }
 
             setupMapper(segment);
-            setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? 
reducerCount + 2 : reducerCount);
+            setupReducer(output, segment, statistics_enabled, reducerCount);
 
             attachCubeMetadata(cube, job.getConfiguration());
 
@@ -136,6 +136,15 @@ public class FactDistinctColumnsJob extends 
AbstractHadoopJob {
 
     }
 
+    private int getHLLShardBase(CubeSegment segment) {
+        int nCuboids = segment.getCuboidScheduler().getAllCuboidIds().size();
+        int shardBase = (nCuboids - 1) / 
segment.getConfig().getFactDistinctJobPerReducerHLLCuboidNumber() + 1;
+        if (shardBase > 
segment.getConfig().getFactDistinctJobHLLMaxReducerNumber()) {
+            shardBase = 
segment.getConfig().getFactDistinctJobHLLMaxReducerNumber();
+        }
+        return shardBase;
+    }
+
     private void setupMapper(CubeSegment cubeSeg) throws IOException {
         IMRTableInputFormat flatTableInputFormat = 
MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
         flatTableInputFormat.configureJob(job);
@@ -146,7 +155,14 @@ public class FactDistinctColumnsJob extends 
AbstractHadoopJob {
         job.setMapOutputValueClass(Text.class);
     }
 
-    private void setupReducer(Path output, int numberOfReducers) throws 
IOException {
+    private void setupReducer(Path output, CubeSegment cubeSeg, String 
statistics_enabled, int reducerCount)
+            throws IOException {
+        int numberOfReducers = reducerCount;
+        if ("true".equalsIgnoreCase(statistics_enabled)) {
+            int hllShardBase = getHLLShardBase(cubeSeg);
+            FactDistinctColumnPartitioner.setHLLShard(job.getConfiguration(), 
hllShardBase);
+            numberOfReducers += (1 + hllShardBase);
+        }
         job.setReducerClass(FactDistinctColumnsReducer.class);
         job.setPartitionerClass(FactDistinctColumnPartitioner.class);
         job.setNumReduceTasks(numberOfReducers);

http://git-wip-us.apache.org/repos/asf/kylin/blob/377ec491/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 0f65a3e..a733430 100755
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -107,24 +107,38 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<SelfDefineSortableK
         uhcReducerCount = cube.getConfig().getUHCReducerCount();
         initReducerIdToColumnIndex(config);
 
-        if (collectStatistics && (taskId == numberOfTasks - 1)) {
-            // hll
-            isStatistics = true;
-            baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId();
-            baseCuboidRowCountInMappers = Lists.newArrayList();
-            cuboidHLLMap = Maps.newHashMap();
-            samplingPercentage = 
Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
-            logger.info("Reducer " + taskId + " handling stats");
-        } else if (collectStatistics && (taskId == numberOfTasks - 2)) {
-            // partition col
-            isPartitionCol = true;
-            col = 
cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
-            if (col == null) {
-                logger.info("No partition col. This reducer will do nothing");
+        boolean ifCol = true;
+        if (collectStatistics) {
+            int hllShardBase = 
conf.getInt(FactDistinctColumnPartitioner.HLL_SHARD_BASE_PROPERTY_NAME, 0);
+            if (hllShardBase <= 0) {
+                throw new IllegalArgumentException("In job configuration the 
value for property "
+                        + 
FactDistinctColumnPartitioner.HLL_SHARD_BASE_PROPERTY_NAME + " is " + 
hllShardBase
+                        + ". It should be set correctly!!!");
+            }
+            ifCol = false;
+            if (taskId >= numberOfTasks - hllShardBase) {
+                // hll
+                isStatistics = true;
+                baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId();
+                baseCuboidRowCountInMappers = Lists.newArrayList();
+                cuboidHLLMap = Maps.newHashMap();
+                samplingPercentage = Integer
+                        
.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
+                logger.info("Reducer " + taskId + " handling stats");
+            } else if (taskId == numberOfTasks - hllShardBase - 1) {
+                // partition col
+                isPartitionCol = true;
+                col = 
cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
+                if (col == null) {
+                    logger.info("No partition col. This reducer will do 
nothing");
+                } else {
+                    logger.info("Reducer " + taskId + " handling partition col 
" + col.getIdentity());
+                }
             } else {
-                logger.info("Reducer " + taskId + " handling partition col " + 
col.getIdentity());
+                ifCol = true;
             }
-        } else {
+        }
+        if (ifCol) {
             // normal col
             col = columnList.get(reducerIdToColumnIndex.get(taskId));
             Preconditions.checkNotNull(col);
@@ -291,7 +305,7 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<SelfDefineSortableK
     }
 
     private void logMapperAndCuboidStatistics(List<Long> allCuboids) throws 
IOException {
-        logger.info("Total cuboid number: \t" + allCuboids.size());
+        logger.info("Cuboid number for task: " + taskId + "\t" + 
allCuboids.size());
         logger.info("Samping percentage: \t" + samplingPercentage);
         logger.info("The following statistics are collected based on sampling 
data.");
         logger.info("Number of Mappers: " + 
baseCuboidRowCountInMappers.size());
@@ -308,11 +322,8 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<SelfDefineSortableK
             logger.info("Cuboid " + i + " row count is: \t " + 
cuboidHLLMap.get(i).getCountEstimate());
         }
 
-        logger.info("Sum of all the cube segments (before merge) is: \t " + 
totalRowsBeforeMerge);
-        logger.info("After merge, the cube has row count: \t " + grantTotal);
-        if (grantTotal > 0) {
-            logger.info("The mapper overlap ratio is: \t" + 
totalRowsBeforeMerge / grantTotal);
-        }
+        logger.info("Sum of row counts (before merge) is: \t " + 
totalRowsBeforeMerge);
+        logger.info("After merge, the row count: \t " + grantTotal);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/377ec491/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
index 2196f09..f69bf67 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
@@ -19,25 +19,40 @@
 package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsWriter;
 import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.measure.hllc.HLLCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 /**
  * Save the cube segment statistic to Kylin metadata store
  */
@@ -56,14 +71,79 @@ public class SaveStatisticsStep extends AbstractExecutable {
 
         ResourceStore rs = ResourceStore.getStore(kylinConf);
         try {
+
+            FileSystem fs = HadoopUtil.getWorkingFileSystem();
+            Configuration hadoopConf = HadoopUtil.getCurrentConfiguration();
             Path statisticsDir = new 
Path(CubingExecutableUtil.getStatisticsPath(this.getParams()));
-            FileSystem fs = HadoopUtil.getFileSystem(statisticsDir);
-            Path statisticsFilePath = HadoopUtil.getFilterOnlyPath(fs, 
statisticsDir, BatchConstants.CFG_OUTPUT_STATISTICS);
-            if (statisticsFilePath == null) {
+            Path[] statisticsFiles = HadoopUtil.getFilterPath(fs, 
statisticsDir, BatchConstants.CFG_OUTPUT_STATISTICS);
+            if (statisticsFiles == null) {
                 throw new IOException("fail to find the statistics file in 
base dir: " + statisticsDir);
             }
 
-            FSDataInputStream is = fs.open(statisticsFilePath);
+            Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
+            long totalRowsBeforeMerge = 0;
+            long grantTotal = 0;
+            int samplingPercentage = -1;
+            int mapperNumber = -1;
+            for (Path item : statisticsFiles) {
+                int pSamplingPercentage = 0;
+                double pMapperOverlapRatio = 0;
+                int pMapperNumber = 0;
+                long pGrantTotal = 0;
+                try (SequenceFile.Reader reader = new 
SequenceFile.Reader(hadoopConf, SequenceFile.Reader.file(item))) {
+                    LongWritable key = (LongWritable) 
ReflectionUtils.newInstance(reader.getKeyClass(), hadoopConf);
+                    BytesWritable value = (BytesWritable) 
ReflectionUtils.newInstance(reader.getValueClass(),
+                            hadoopConf);
+                    while (reader.next(key, value)) {
+                        if (key.get() == 0L) {
+                            pSamplingPercentage = 
Bytes.toInt(value.getBytes());
+                        } else if (key.get() == -1L) {
+                            pMapperOverlapRatio = 
Bytes.toDouble(value.getBytes());
+                        } else if (key.get() == -2L) {
+                            pMapperNumber = Bytes.toInt(value.getBytes());
+                        } else {
+                            HLLCounter hll = new 
HLLCounter(kylinConf.getCubeStatsHLLPrecision());
+                            ByteArray byteArray = new 
ByteArray(value.getBytes());
+                            hll.readRegisters(byteArray.asBuffer());
+                            cuboidHLLMap.put(key.get(), hll);
+                            pGrantTotal += hll.getCountEstimate();
+                        }
+                    }
+                    totalRowsBeforeMerge += pGrantTotal * pMapperOverlapRatio;
+                    grantTotal += pGrantTotal;
+                    if (pMapperNumber > 0) {
+                        if (mapperNumber < 0) {
+                            mapperNumber = pMapperNumber;
+                        } else {
+                            throw new RuntimeException(
+                                    "Base cuboid has been distributed to 
multiple reducers at step FactDistinctColumnsReducer!!!");
+                        }
+                    }
+                    if (samplingPercentage < 0) {
+                        samplingPercentage = pSamplingPercentage;
+                    } else if (samplingPercentage != pSamplingPercentage) {
+                        throw new RuntimeException(
+                                "The sampling percentage should be same among 
all of the reducer of FactDistinctColumnsReducer!!!");
+                    }
+                }
+            }
+            if (samplingPercentage < 0) {
+                logger.warn("The sampling percentage should be set!!!");
+            }
+            if (mapperNumber < 0) {
+                logger.warn("The mapper number should be set!!!");
+            }
+
+            if (logger.isDebugEnabled()) {
+                logMapperAndCuboidStatistics(cuboidHLLMap, samplingPercentage, 
mapperNumber, grantTotal,
+                        totalRowsBeforeMerge);
+            }
+            double mapperOverlapRatio = grantTotal == 0 ? 0 : (double) 
totalRowsBeforeMerge / grantTotal;
+            CubeStatsWriter.writeCuboidStatistics(hadoopConf, statisticsDir, 
cuboidHLLMap, samplingPercentage,
+                    mapperNumber, mapperOverlapRatio);
+
+            Path statisticsFile = new Path(statisticsDir, 
BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+            FSDataInputStream is = fs.open(statisticsFile);
             try {
                 // put the statistics to metadata store
                 String statisticsFileName = 
newSegment.getStatisticsResourcePath();
@@ -84,4 +164,23 @@ public class SaveStatisticsStep extends AbstractExecutable {
         }
     }
 
+    private void logMapperAndCuboidStatistics(Map<Long, HLLCounter> 
cuboidHLLMap, int samplingPercentage,
+            int mapperNumber, long grantTotal, long totalRowsBeforeMerge) 
throws IOException {
+        logger.debug("Total cuboid number: \t" + cuboidHLLMap.size());
+        logger.debug("Samping percentage: \t" + samplingPercentage);
+        logger.debug("The following statistics are collected based on sampling 
data.");
+        logger.debug("Number of Mappers: " + mapperNumber);
+
+        List<Long> allCuboids = Lists.newArrayList(cuboidHLLMap.keySet());
+        Collections.sort(allCuboids);
+        for (long i : allCuboids) {
+            logger.debug("Cuboid " + i + " row count is: \t " + 
cuboidHLLMap.get(i).getCountEstimate());
+        }
+
+        logger.debug("Sum of all the cube segments (before merge) is: \t " + 
totalRowsBeforeMerge);
+        logger.debug("After merge, the cube has row count: \t " + grantTotal);
+        if (grantTotal > 0) {
+            logger.debug("The mapper overlap ratio is: \t" + (double) 
totalRowsBeforeMerge / grantTotal);
+        }
+    }
 }

Reply via email to