APACHE-KYLIN-2866: Enlarge the reducer number for hyperloglog statistics 
calculation at step CalculateStatsFromBaseCuboidJob

Signed-off-by: lidongsjtu <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5425deba
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5425deba
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5425deba

Branch: refs/heads/master
Commit: 5425debae98f6397bf05f5459416a57d8feb14da
Parents: 377ec49
Author: Zhong <[email protected]>
Authored: Thu Sep 28 19:12:07 2017 +0800
Committer: lidongsjtu <[email protected]>
Committed: Wed Dec 20 23:20:11 2017 +0800

----------------------------------------------------------------------
 .../kylin/engine/mr/common/CubeStatsWriter.java | 25 +++++++--
 .../steps/CalculateStatsFromBaseCuboidJob.java  | 10 +++-
 ...CalculateStatsFromBaseCuboidPartitioner.java | 59 ++++++++++++++++++++
 .../CalculateStatsFromBaseCuboidReducer.java    |  7 ++-
 .../mr/steps/MergeStatisticsWithOldStep.java    | 29 +++++++---
 5 files changed, 111 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/5425deba/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
index 8f400c3..b1e59a7 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
@@ -20,7 +20,6 @@ package org.apache.kylin.engine.mr.common;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -35,6 +34,8 @@ import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.hllc.HLLCounter;
 
+import com.google.common.collect.Lists;
+
 public class CubeStatsWriter {
 
     public static void writeCuboidStatistics(Configuration conf, Path 
outputPath, //
@@ -45,17 +46,32 @@ public class CubeStatsWriter {
     public static void writeCuboidStatistics(Configuration conf, Path 
outputPath, //
             Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int 
mapperNumber, double mapperOverlapRatio) throws IOException {
         Path seqFilePath = new Path(outputPath, 
BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+        writeCuboidStatisticsInner(conf, seqFilePath, cuboidHLLMap, 
samplingPercentage, mapperNumber,
+                mapperOverlapRatio);
+    }
+
+    public static void writePartialCuboidStatistics(Configuration conf, Path 
outputPath, //
+            Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int 
mapperNumber, double mapperOverlapRatio,
+            int shard) throws IOException {
+        Path seqFilePath = new Path(outputPath, 
BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME + "_" + shard);
+        writeCuboidStatisticsInner(conf, seqFilePath, cuboidHLLMap, 
samplingPercentage, mapperNumber,
+                mapperOverlapRatio);
+    }
 
-        List<Long> allCuboids = new ArrayList<Long>();
+    private static void writeCuboidStatisticsInner(Configuration conf, Path 
outputFilePath, //
+            Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int 
mapperNumber, double mapperOverlapRatio)
+            throws IOException {
+        List<Long> allCuboids = Lists.newArrayList();
         allCuboids.addAll(cuboidHLLMap.keySet());
         Collections.sort(allCuboids);
 
         ByteBuffer valueBuf = 
ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
-        SequenceFile.Writer writer = SequenceFile.createWriter(conf, 
SequenceFile.Writer.file(seqFilePath), 
SequenceFile.Writer.keyClass(LongWritable.class), 
SequenceFile.Writer.valueClass(BytesWritable.class));
+        SequenceFile.Writer writer = SequenceFile.createWriter(conf, 
SequenceFile.Writer.file(outputFilePath),
+                SequenceFile.Writer.keyClass(LongWritable.class), 
SequenceFile.Writer.valueClass(BytesWritable.class));
         try {
             // mapper overlap ratio at key -1
             writer.append(new LongWritable(-1), new 
BytesWritable(Bytes.toBytes(mapperOverlapRatio)));
-            
+
             // mapper number at key -2
             writer.append(new LongWritable(-2), new 
BytesWritable(Bytes.toBytes(mapperNumber)));
 
@@ -72,5 +88,4 @@ public class CubeStatsWriter {
             IOUtils.closeQuietly(writer);
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5425deba/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
index b60076c..8f64272 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
@@ -35,6 +35,7 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.MapReduceUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,7 +78,7 @@ public class CalculateStatsFromBaseCuboidJob extends 
AbstractHadoopJob {
             setJobClasspath(job, cube.getConfig());
 
             setupMapper(input);
-            setupReducer(output, 1);
+            setupReducer(output, cubeSegment);
 
             attachSegmentMetadataWithDict(cubeSegment, job.getConfiguration());
 
@@ -101,12 +102,15 @@ public class CalculateStatsFromBaseCuboidJob extends 
AbstractHadoopJob {
         job.setMapOutputValueClass(Text.class);
     }
 
-    private void setupReducer(Path output, int numberOfReducers) throws 
IOException {
+    private void setupReducer(Path output, CubeSegment cubeSeg) throws 
IOException {
+        int hllShardBase = MapReduceUtil.getHLLShardBase(cubeSeg);
+        job.getConfiguration().setInt(BatchConstants.CFG_HLL_SHARD_BASE, 
hllShardBase);
+
         job.setReducerClass(CalculateStatsFromBaseCuboidReducer.class);
         job.setOutputFormatClass(SequenceFileOutputFormat.class);
         job.setOutputKeyClass(NullWritable.class);
         job.setOutputValueClass(Text.class);
-        job.setNumReduceTasks(numberOfReducers);
+        job.setNumReduceTasks(hllShardBase);
 
         FileOutputFormat.setOutputPath(job, output);
         job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, 
output.toString());

http://git-wip-us.apache.org/repos/asf/kylin/blob/5425deba/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidPartitioner.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidPartitioner.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidPartitioner.java
new file mode 100644
index 0000000..70db21b
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidPartitioner.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class CalculateStatsFromBaseCuboidPartitioner extends Partitioner<Text, 
Text> implements Configurable {
+    private static final Logger logger = 
LoggerFactory.getLogger(CalculateStatsFromBaseCuboidPartitioner.class);
+
+    private Configuration conf;
+    private int hllShardBase = 1;
+
+    @Override
+    public int getPartition(Text key, Text value, int numReduceTasks) {
+        Long cuboidId = Bytes.toLong(key.getBytes());
+        int shard = cuboidId.hashCode() % hllShardBase;
+        if (shard < 0) {
+            shard += hllShardBase;
+        }
+        return numReduceTasks - shard - 1;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+        hllShardBase = conf.getInt(BatchConstants.CFG_HLL_SHARD_BASE, 1);
+        logger.info("shard base for hll is " + hllShardBase);
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5425deba/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidReducer.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidReducer.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidReducer.java
index 489dac4..7210622 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidReducer.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidReducer.java
@@ -55,6 +55,8 @@ public class CalculateStatsFromBaseCuboidReducer extends 
KylinReducer<Text, Text
     private String output = null;
     private int samplingPercentage;
 
+    private int taskId;
+
     @Override
     protected void doSetup(Context context) throws IOException {
         super.bindCurrentConfiguration(context.getConfiguration());
@@ -72,6 +74,7 @@ public class CalculateStatsFromBaseCuboidReducer extends 
KylinReducer<Text, Text
         samplingPercentage = Integer
                 
.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
 
+        taskId = context.getTaskAttemptID().getTaskID().getId();
         cuboidHLLMap = Maps.newHashMap();
     }
 
@@ -106,7 +109,7 @@ public class CalculateStatsFromBaseCuboidReducer extends 
KylinReducer<Text, Text
         }
         double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) 
totalRowsBeforeMerge / grandTotal;
 
-        CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new 
Path(output), //
-                cuboidHLLMap, samplingPercentage, 
baseCuboidRowCountInMappers.size(), mapperOverlapRatio);
+        
CubeStatsWriter.writePartialCuboidStatistics(context.getConfiguration(), new 
Path(output), //
+                cuboidHLLMap, samplingPercentage, 
baseCuboidRowCountInMappers.size(), mapperOverlapRatio, taskId);
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5425deba/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
index e97c6bb..7855c06 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
@@ -63,7 +63,6 @@ public class MergeStatisticsWithOldStep extends 
AbstractExecutable {
         final CubeManager mgr = CubeManager.getInstance(context.getConfig());
         final CubeInstance cube = 
mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
         final CubeSegment optimizeSegment = 
cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
-        final String statsInputPath = 
CubingExecutableUtil.getStatisticsPath(this.getParams());
 
         CubeSegment oldSegment = 
optimizeSegment.getCubeInstance().getOriginalSegmentToOptimize(optimizeSegment);
         Preconditions.checkNotNull(oldSegment,
@@ -76,16 +75,28 @@ public class MergeStatisticsWithOldStep extends 
AbstractExecutable {
 
         try {
             //1. Add statistics from optimized segment
-            Path statisticsFilePath = new Path(statsInputPath,
-                    BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+            Path statisticsDirPath = new 
Path(CubingExecutableUtil.getStatisticsPath(this.getParams()));
             FileSystem hdfs = FileSystem.get(conf);
-            if (!hdfs.exists(statisticsFilePath))
-                throw new IOException("File " + statisticsFilePath + " does 
not exists");
+            if (!hdfs.exists(statisticsDirPath)) {
+                throw new IOException("StatisticsFilePath " + 
statisticsDirPath + " does not exists");
+            }
+
+            if (!hdfs.isDirectory(statisticsDirPath)) {
+                throw new IOException("StatisticsFilePath " + 
statisticsDirPath + " is not a directory");
+            }
 
-            CubeStatsReader optimizeSegmentStatsReader = new 
CubeStatsReader(optimizeSegment, null,
-                    optimizeSegment.getConfig(), statisticsFilePath);
-            averageSamplingPercentage += 
optimizeSegmentStatsReader.getSamplingPercentage();
-            addFromCubeStatsReader(optimizeSegmentStatsReader);
+            Path[] statisticsFiles = HadoopUtil.getFilterPath(hdfs, 
statisticsDirPath,
+                    BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+            if (statisticsFiles == null) {
+                throw new IOException("fail to find the statistics file in 
base dir: " + statisticsDirPath);
+            }
+
+            for (Path item : statisticsFiles) {
+                CubeStatsReader optimizeSegmentStatsReader = new 
CubeStatsReader(optimizeSegment, null,
+                        optimizeSegment.getConfig(), item);
+                averageSamplingPercentage += 
optimizeSegmentStatsReader.getSamplingPercentage();
+                addFromCubeStatsReader(optimizeSegmentStatsReader);
+            }
 
             //2. Add statistics from old segment
             CubeStatsReader oldSegmentStatsReader = new 
CubeStatsReader(oldSegment, null, oldSegment.getConfig());

Reply via email to