KYLIN-2623 Move output(Hbase) code to outputside

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a38b02df
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a38b02df
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a38b02df

Branch: refs/heads/KYLIN-2624
Commit: a38b02df0387541684aa1689e044927af650f1c7
Parents: edc4d4c
Author: Roger Shi <rogershijich...@hotmail.com>
Authored: Tue May 16 13:50:23 2017 +0800
Committer: liyang-gmt8 <liy...@apache.org>
Committed: Tue May 16 14:03:50 2017 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/engine/mr/IMROutput2.java  |  33 +++++-
 .../engine/mr/common/AbstractHadoopJob.java     |   6 +-
 .../apache/kylin/engine/mr/steps/CuboidJob.java |  37 +++----
 .../kylin/engine/mr/steps/InMemCuboidJob.java   |  55 ++--------
 .../engine/mr/steps/LayerReducerNumSizing.java  |  80 --------------
 .../kylin/engine/mr/steps/MergeCuboidJob.java   |  37 ++++---
 .../engine/mr/steps/MergeCuboidMapper.java      |  35 +-----
 .../kylin/engine/mr/steps/ReducerNumSizing.java | 106 +++++++++++++++++++
 .../apache/kylin/source/hive/HiveMRInput.java   |   2 -
 .../apache/kylin/source/kafka/KafkaMRInput.java |   1 -
 .../hbase/steps/HBaseMROutput2Transition.java   |  97 +++++++++++++++++
 11 files changed, 284 insertions(+), 205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/a38b02df/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
index 603f207..69bba0a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
@@ -18,11 +18,14 @@
 
 package org.apache.kylin.engine.mr;
 
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 
-import java.util.List;
-
 public interface IMROutput2 {
 
     /** Return a helper to participate in batch cubing job flow. */
@@ -53,6 +56,19 @@ public interface IMROutput2 {
 
         /** Add step that does any necessary clean up. */
         public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+
+        public IMROutputFormat getOuputFormat();
+
+    }
+
+    public interface IMROutputFormat {
+
+        /** Configure the InputFormat of given job. */
+        public void configureJobInput(Job job, String input) throws Exception;
+
+        /** Configure the OutputFormat of given job. */
+        public void configureJobOutput(Job job, String output, CubeSegment 
segment, int level) throws Exception;
+
     }
 
     /** Return a helper to participate in batch merge job flow. */
@@ -82,6 +98,19 @@ public interface IMROutput2 {
 
         /** Add step that does any necessary clean up. */
         public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
+
+        public IMRMergeOutputFormat getOuputFormat();
+    }
+
+    public interface IMRMergeOutputFormat {
+
+        /** Configure the InputFormat of given job. */
+        public void configureJobInput(Job job, String input) throws Exception;
+
+        /** Configure the OutputFormat of given job. */
+        public void configureJobOutput(Job job, String output, CubeSegment 
segment) throws Exception;
+
+        public CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance 
cube);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/a38b02df/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 44686d6..764cbdd 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -559,7 +559,7 @@ public abstract class AbstractHadoopJob extends Configured 
implements Tool {
         HadoopUtil.deletePath(conf, path);
     }
 
-    protected double getTotalMapInputMB() throws ClassNotFoundException, 
IOException, InterruptedException, JobException {
+    public static double getTotalMapInputMB(Job job) throws 
ClassNotFoundException, IOException, InterruptedException, JobException {
         if (job == null) {
             throw new JobException("Job is null");
         }
@@ -576,6 +576,10 @@ public abstract class AbstractHadoopJob extends Configured 
implements Tool {
         return totalMapInputMB;
     }
 
+    protected double getTotalMapInputMB() throws ClassNotFoundException, 
IOException, InterruptedException, JobException {
+        return getTotalMapInputMB(job);
+    }
+
     protected int getMapInputSplitCount() throws ClassNotFoundException, 
JobException, IOException, InterruptedException {
         if (job == null) {
             throw new JobException("Job is null");

http://git-wip-us.apache.org/repos/asf/kylin/blob/a38b02df/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index b2e186d..6a8ba4c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -18,24 +18,19 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import java.io.IOException;
-
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.IMROutput2;
 import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
@@ -84,11 +79,10 @@ public class CuboidJob extends AbstractHadoopJob {
             options.addOption(OPTION_INPUT_PATH);
             options.addOption(OPTION_OUTPUT_PATH);
             options.addOption(OPTION_NCUBOID_LEVEL);
-            options.addOption(OPTION_INPUT_FORMAT);
             options.addOption(OPTION_CUBING_JOB_ID);
             parseOptions(options, args);
 
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+            String output = getOptionValue(OPTION_OUTPUT_PATH);
             String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
             int nCuboidLevel = 
Integer.parseInt(getOptionValue(OPTION_NCUBOID_LEVEL));
             String segmentID = getOptionValue(OPTION_SEGMENT_ID);
@@ -109,8 +103,10 @@ public class CuboidJob extends AbstractHadoopJob {
 
             setJobClasspath(job, cube.getConfig());
 
+            // add metadata to distributed cache
+            attachSegmentMetadataWithDict(segment, job.getConfiguration());
+
             // Mapper
-            configureMapperInputFormat(segment);
             job.setMapperClass(this.mapperClass);
             job.setMapOutputKeyClass(Text.class);
             job.setMapOutputValueClass(Text.class);
@@ -118,22 +114,20 @@ public class CuboidJob extends AbstractHadoopJob {
 
             // Reducer
             job.setReducerClass(CuboidReducer.class);
-            job.setOutputFormatClass(SequenceFileOutputFormat.class);
             job.setOutputKeyClass(Text.class);
             job.setOutputValueClass(Text.class);
 
-            FileOutputFormat.setOutputPath(job, output);
+            // set input
+            configureMapperInputFormat(segment);
+
+            // set output
+            IMROutput2.IMROutputFormat outputFormat = 
MRUtil.getBatchCubingOutputSide2(segment).getOuputFormat();
+            outputFormat.configureJobOutput(job, output, segment, 
nCuboidLevel);
 
             // set job configuration
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
             job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, 
segmentID);
             
job.getConfiguration().setInt(BatchConstants.CFG_CUBE_CUBOID_LEVEL, 
nCuboidLevel);
-            // add metadata to distributed cache
-            attachSegmentMetadataWithDict(segment, job.getConfiguration());
-
-            
job.setNumReduceTasks(LayerReducerNumSizing.getReduceTaskNum(segment, 
getTotalMapInputMB(), nCuboidLevel));
-
-            this.deletePath(job.getConfiguration(), output);
 
             return waitForCompletion(job);
         } finally {
@@ -142,7 +136,7 @@ public class CuboidJob extends AbstractHadoopJob {
         }
     }
 
-    private void configureMapperInputFormat(CubeSegment cubeSeg) throws 
IOException {
+    private void configureMapperInputFormat(CubeSegment cubeSeg) throws 
Exception {
         String input = getOptionValue(OPTION_INPUT_PATH);
 
         if ("FLAT_TABLE".equals(input)) {
@@ -151,12 +145,9 @@ public class CuboidJob extends AbstractHadoopJob {
             flatTableInputFormat.configureJob(job);
         } else {
             // n-dimension cuboid case
+            IMROutput2.IMROutputFormat outputFormat = 
MRUtil.getBatchCubingOutputSide2(cubeSeg).getOuputFormat();
+            outputFormat.configureJobInput(job, input);
             FileInputFormat.setInputPaths(job, new Path(input));
-            if (hasOption(OPTION_INPUT_FORMAT) && 
("textinputformat".equalsIgnoreCase(getOptionValue(OPTION_INPUT_FORMAT)))) {
-                job.setInputFormatClass(TextInputFormat.class);
-            } else {
-                job.setInputFormatClass(SequenceFileInputFormat.class);
-            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/a38b02df/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index 7706bac..73a2eb9 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -18,29 +18,21 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import java.io.IOException;
-import java.util.Map;
-
 import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Reducer.Context;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.ByteArrayWritable;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.IMROutput2;
 import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -107,28 +99,24 @@ public class InMemCuboidJob extends AbstractHadoopJob {
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
             job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, 
segmentID);
 
-            // set input
-            IMRTableInputFormat flatTableInputFormat = 
MRUtil.getBatchCubingInputSide(segment).getFlatTableInputFormat();
-            flatTableInputFormat.configureJob(job);
-
             // set mapper
             job.setMapperClass(InMemCuboidMapper.class);
             job.setMapOutputKeyClass(ByteArrayWritable.class);
             job.setMapOutputValueClass(ByteArrayWritable.class);
 
-            // set output
-            job.setReducerClass(InMemCuboidReducer.class);
-            job.setNumReduceTasks(calculateReducerNum(segment));
-
+            // set reducer
             // the cuboid file and KV class must be compatible with 0.7 
version for smooth upgrade
-            job.setOutputFormatClass(SequenceFileOutputFormat.class);
+            job.setReducerClass(InMemCuboidReducer.class);
             job.setOutputKeyClass(Text.class);
             job.setOutputValueClass(Text.class);
 
-            Path outputPath = new Path(output);
-            FileOutputFormat.setOutputPath(job, outputPath);
+            // set input
+            IMRTableInputFormat flatTableInputFormat = 
MRUtil.getBatchCubingInputSide(segment).getFlatTableInputFormat();
+            flatTableInputFormat.configureJob(job);
 
-            HadoopUtil.deletePath(job.getConfiguration(), outputPath);
+            // set output
+            IMROutput2.IMROutputFormat outputFormat = 
MRUtil.getBatchCubingOutputSide2(segment).getOuputFormat();
+            outputFormat.configureJobOutput(job, output, segment, 0);
 
             return waitForCompletion(job);
         } finally {
@@ -137,31 +125,6 @@ public class InMemCuboidJob extends AbstractHadoopJob {
         }
     }
 
-    private int calculateReducerNum(CubeSegment cubeSeg) throws IOException {
-        KylinConfig kylinConfig = cubeSeg.getConfig();
-
-        Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, 
kylinConfig).getCuboidSizeMap();
-        double totalSizeInM = 0;
-        for (Double cuboidSize : cubeSizeMap.values()) {
-            totalSizeInM += cuboidSize;
-        }
-
-        double perReduceInputMB = 
kylinConfig.getDefaultHadoopJobReducerInputMB();
-
-        // number of reduce tasks
-        int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB);
-
-        // at least 1 reducer by default
-        numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), 
numReduceTasks);
-        // no more than 500 reducer by default
-        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), 
numReduceTasks);
-
-        logger.info("Having total map input MB " + Math.round(totalSizeInM));
-        logger.info("Having per reduce MB " + perReduceInputMB);
-        logger.info("Setting " + Context.NUM_REDUCES + "=" + numReduceTasks);
-        return numReduceTasks;
-    }
-
     public static void main(String[] args) throws Exception {
         InMemCuboidJob job = new InMemCuboidJob();
         int exitCode = ToolRunner.run(job, args);

http://git-wip-us.apache.org/repos/asf/kylin/blob/a38b02df/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReducerNumSizing.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReducerNumSizing.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReducerNumSizing.java
deleted file mode 100644
index 7ce9842..0000000
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReducerNumSizing.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *  
- *     http://www.apache.org/licenses/LICENSE-2.0
- *  
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.engine.mr.common.CubeStatsReader;
-import org.apache.kylin.job.exception.JobException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class LayerReducerNumSizing {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(LayerReducerNumSizing.class);
-
-    public static int getReduceTaskNum(CubeSegment cubeSegment, double 
totalMapInputMB, int level) throws ClassNotFoundException, IOException, 
InterruptedException, JobException {
-        CubeDesc cubeDesc = cubeSegment.getCubeDesc();
-        KylinConfig kylinConfig = cubeDesc.getConfig();
-
-        double perReduceInputMB = 
kylinConfig.getDefaultHadoopJobReducerInputMB();
-        double reduceCountRatio = 
kylinConfig.getDefaultHadoopJobReducerCountRatio();
-        logger.info("Having per reduce MB " + perReduceInputMB + ", reduce 
count ratio " + reduceCountRatio + ", level " + level);
-
-        CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, 
kylinConfig);
-
-        double parentLayerSizeEst, currentLayerSizeEst, 
adjustedCurrentLayerSizeEst;
-
-        if (level == -1) {
-            //merge case
-            double estimatedSize = cubeStatsReader.estimateCubeSize();
-            adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? 
totalMapInputMB : estimatedSize;
-            logger.info("estimated size {}, input size {}, 
adjustedCurrentLayerSizeEst: {}", estimatedSize, totalMapInputMB, 
adjustedCurrentLayerSizeEst);
-        } else if (level == 0) {
-            //base cuboid case TODO: the estimation could be very WRONG 
because it has no correction
-            adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0);
-            logger.info("adjustedCurrentLayerSizeEst: {}", 
adjustedCurrentLayerSizeEst);
-        } else {
-            parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1);
-            currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level);
-            adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst 
* currentLayerSizeEst;
-            logger.info("totalMapInputMB: {}, parentLayerSizeEst: {}, 
currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}", totalMapInputMB, 
parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst);
-        }
-
-        // number of reduce tasks
-        int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / 
perReduceInputMB * reduceCountRatio + 0.99);
-
-        // adjust reducer number for cube which has DISTINCT_COUNT measures 
for better performance
-        if (cubeDesc.hasMemoryHungryMeasures()) {
-            logger.info("Multiply reducer num by 4 to boost performance for 
memory hungry measures");
-            numReduceTasks = numReduceTasks * 4;
-        }
-
-        // at least 1 reducer by default
-        numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), 
numReduceTasks);
-        // no more than 500 reducer by default
-        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), 
numReduceTasks);
-
-        return numReduceTasks;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/a38b02df/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
index 84b76e3..63d0619 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
@@ -19,15 +19,14 @@
 package org.apache.kylin.engine.mr.steps;
 
 import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 
 public class MergeCuboidJob extends CuboidJob {
@@ -44,11 +43,14 @@ public class MergeCuboidJob extends CuboidJob {
             options.addOption(OPTION_OUTPUT_PATH);
             parseOptions(options, args);
 
+            String input = getOptionValue(OPTION_INPUT_PATH);
+            String output = getOptionValue(OPTION_OUTPUT_PATH);
             String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
             String segmentID = getOptionValue(OPTION_SEGMENT_ID);
 
             CubeManager cubeMgr = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
             CubeInstance cube = cubeMgr.getCube(cubeName);
+            CubeSegment cubeSeg = cube.getSegmentById(segmentID);
 
             // start job
             String jobName = getOptionValue(OPTION_JOB_NAME);
@@ -57,35 +59,32 @@ public class MergeCuboidJob extends CuboidJob {
 
             setJobClasspath(job, cube.getConfig());
 
-            // set inputs
-            addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-            FileOutputFormat.setOutputPath(job, output);
+            // add metadata to distributed cache
+            // TODO actually only dictionaries from merging segments are needed
+            attachCubeMetadataWithDict(cube, job.getConfiguration());
 
             // Mapper
-            job.setInputFormatClass(SequenceFileInputFormat.class);
             job.setMapperClass(MergeCuboidMapper.class);
             job.setMapOutputKeyClass(Text.class);
             job.setMapOutputValueClass(Text.class);
 
+            // Reducer
             job.setReducerClass(CuboidReducer.class);
-            job.setOutputFormatClass(SequenceFileOutputFormat.class);
             job.setOutputKeyClass(Text.class);
             job.setOutputValueClass(Text.class);
 
+            // set inputs
+            IMROutput2.IMRMergeOutputFormat outputFormat = 
MRUtil.getBatchMergeOutputSide2(cubeSeg).getOuputFormat();
+            outputFormat.configureJobInput(job, input);
+            addInputDirs(input, job);
+
+            // set output
+            outputFormat.configureJobOutput(job, output, cubeSeg);
+
             // set job configuration
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
             job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, 
segmentID);
 
-            // add metadata to distributed cache
-            // TODO actually only dictionaries from merging segments are needed
-            attachCubeMetadataWithDict(cube, job.getConfiguration());
-
-            
job.setNumReduceTasks(LayerReducerNumSizing.getReduceTaskNum(cube.getSegmentById(segmentID),
 getTotalMapInputMB(), -1));
-
-            this.deletePath(job.getConfiguration(), output);
-
             return waitForCompletion(job);
         } finally {
             if (job != null)

http://git-wip-us.apache.org/repos/asf/kylin/blob/a38b02df/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index fccd48a..a603fc8 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -23,8 +23,6 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -44,7 +42,9 @@ import org.apache.kylin.cube.kv.RowKeyEncoder;
 import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.engine.mr.IMROutput2;
 import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.measure.BufferedMeasureCodec;
@@ -110,7 +110,8 @@ public class MergeCuboidMapper extends KylinMapper<Text, 
Text, Text, Text> {
 
         // decide which source segment
         FileSplit fileSplit = (FileSplit) context.getInputSplit();
-        sourceCubeSegment = findSourceSegment(fileSplit, cube);
+        IMROutput2.IMRMergeOutputFormat outputFormat = 
MRUtil.getBatchMergeOutputSide2(mergedCubeSegment).getOuputFormat();
+        sourceCubeSegment = outputFormat.findSourceSegment(fileSplit, cube);
 
         rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
         rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment);
@@ -146,34 +147,6 @@ public class MergeCuboidMapper extends KylinMapper<Text, 
Text, Text, Text> {
         }
     }
 
-    private static final Pattern JOB_NAME_PATTERN = 
Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
-
-    public CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance 
cube) {
-        String filePath = fileSplit.getPath().toString();
-        String jobID = extractJobIDFromPath(filePath);
-        return findSegmentWithUuid(jobID, cube);
-    }
-
-    private static String extractJobIDFromPath(String path) {
-        Matcher matcher = JOB_NAME_PATTERN.matcher(path);
-        // check the first occurrence
-        if (matcher.find()) {
-            return matcher.group(1);
-        } else {
-            throw new IllegalStateException("Can not extract job ID from file 
path : " + path);
-        }
-    }
-
-    private static CubeSegment findSegmentWithUuid(String jobID, CubeInstance 
cubeInstance) {
-        for (CubeSegment segment : cubeInstance.getSegments()) {
-            String lastBuildJobID = segment.getLastBuildJobID();
-            if (lastBuildJobID != null && 
lastBuildJobID.equalsIgnoreCase(jobID)) {
-                return segment;
-            }
-        }
-        throw new IllegalStateException("No merging segment's last build job 
ID equals " + jobID);
-    }
-
     @Override
     public void doMap(Text key, Text value, Context context) throws 
IOException, InterruptedException {
         long cuboidID = rowKeySplitter.split(key.getBytes());

http://git-wip-us.apache.org/repos/asf/kylin/blob/a38b02df/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java
new file mode 100644
index 0000000..eb1adad
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.job.exception.JobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReducerNumSizing {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ReducerNumSizing.class);
+
+    public static int getLayeredCubingReduceTaskNum(CubeSegment cubeSegment, 
double totalMapInputMB, int level) throws ClassNotFoundException, IOException, 
InterruptedException, JobException {
+        CubeDesc cubeDesc = cubeSegment.getCubeDesc();
+        KylinConfig kylinConfig = cubeDesc.getConfig();
+
+        double perReduceInputMB = 
kylinConfig.getDefaultHadoopJobReducerInputMB();
+        double reduceCountRatio = 
kylinConfig.getDefaultHadoopJobReducerCountRatio();
+        logger.info("Having per reduce MB " + perReduceInputMB + ", reduce 
count ratio " + reduceCountRatio + ", level " + level);
+
+        CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, 
kylinConfig);
+
+        double parentLayerSizeEst, currentLayerSizeEst, 
adjustedCurrentLayerSizeEst;
+
+        if (level == -1) {
+            //merge case
+            double estimatedSize = cubeStatsReader.estimateCubeSize();
+            adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? 
totalMapInputMB : estimatedSize;
+            logger.info("estimated size {}, input size {}, 
adjustedCurrentLayerSizeEst: {}", estimatedSize, totalMapInputMB, 
adjustedCurrentLayerSizeEst);
+        } else if (level == 0) {
+            //base cuboid case TODO: the estimation could be very WRONG 
because it has no correction
+            adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0);
+            logger.info("adjustedCurrentLayerSizeEst: {}", 
adjustedCurrentLayerSizeEst);
+        } else {
+            parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1);
+            currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level);
+            adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst 
* currentLayerSizeEst;
+            logger.info("totalMapInputMB: {}, parentLayerSizeEst: {}, 
currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}", totalMapInputMB, 
parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst);
+        }
+
+        // number of reduce tasks
+        int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / 
perReduceInputMB * reduceCountRatio + 0.99);
+
+        // adjust reducer number for cube which has DISTINCT_COUNT measures 
for better performance
+        if (cubeDesc.hasMemoryHungryMeasures()) {
+            logger.info("Multiply reducer num by 4 to boost performance for 
memory hungry measures");
+            numReduceTasks = numReduceTasks * 4;
+        }
+
+        // at least 1 reducer by default
+        numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), 
numReduceTasks);
+        // no more than 500 reducer by default
+        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), 
numReduceTasks);
+
+        return numReduceTasks;
+    }
+
+    public static int getInmemCubingReduceTaskNum(CubeSegment cubeSeg) throws 
IOException {
+        KylinConfig kylinConfig = cubeSeg.getConfig();
+
+        Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, 
kylinConfig).getCuboidSizeMap();
+        double totalSizeInM = 0;
+        for (Double cuboidSize : cubeSizeMap.values()) {
+            totalSizeInM += cuboidSize;
+        }
+
+        double perReduceInputMB = 
kylinConfig.getDefaultHadoopJobReducerInputMB();
+
+        // number of reduce tasks
+        int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB);
+
+        // at least 1 reducer by default
+        numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), 
numReduceTasks);
+        // no more than 500 reducer by default
+        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), 
numReduceTasks);
+
+        logger.info("Having total map input MB " + Math.round(totalSizeInM));
+        logger.info("Having per reduce MB " + perReduceInputMB);
+        logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + 
numReduceTasks);
+        return numReduceTasks;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/a38b02df/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index d7a2c7e..6c542ab 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -112,8 +112,6 @@ public class HiveMRInput implements IMRInput {
 
                 HCatInputFormat.setInput(job, dbName, tableName);
                 job.setInputFormatClass(HCatInputFormat.class);
-
-                
job.setMapOutputValueClass(org.apache.hive.hcatalog.data.DefaultHCatRecord.class);
             } catch (IOException e) {
                 throw new RuntimeException(e);
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/a38b02df/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 500e1e9..4c140be 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -117,7 +117,6 @@ public class KafkaMRInput implements IMRInput {
         @Override
         public void configureJob(Job job) {
             job.setInputFormatClass(SequenceFileInputFormat.class);
-            job.setMapOutputValueClass(Text.class);
             String jobId = 
job.getConfiguration().get(BatchConstants.ARG_CUBING_JOB_ID);
             IJoinedFlatTableDesc flatHiveTableDesc = new 
CubeJoinedFlatTableDesc(cubeSegment);
             String inputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, 
JobBuilderSupport.getJobWorkingDir(conf, jobId));

http://git-wip-us.apache.org/repos/asf/kylin/blob/a38b02df/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index c4df354..31cb189 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -19,10 +19,25 @@
 package org.apache.kylin.storage.hbase.steps;
 
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.steps.HiveToBaseCuboidMapper;
+import org.apache.kylin.engine.mr.steps.InMemCuboidMapper;
 import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
+import org.apache.kylin.engine.mr.steps.NDCuboidMapper;
+import org.apache.kylin.engine.mr.steps.ReducerNumSizing;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,9 +77,38 @@ public class HBaseMROutput2Transition implements IMROutput2 {
             public void addStepPhase4_Cleanup(DefaultChainedExecutable 
jobFlow) {
                 // nothing to do
             }
+
+            @Override
+            public IMROutputFormat getOuputFormat() {
+                return new HBaseMROutputFormat();
+            }
         };
     }
 
+    public static class HBaseMROutputFormat implements IMROutputFormat {
+
+        @Override
+        public void configureJobInput(Job job, String input) throws Exception {
+            job.setInputFormatClass(SequenceFileInputFormat.class);
+        }
+
+        @Override
+        public void configureJobOutput(Job job, String output, CubeSegment 
segment, int level) throws Exception {
+            int reducerNum = 1;
+            Class mapperClass = job.getMapperClass();
+            if (mapperClass == HiveToBaseCuboidMapper.class || mapperClass == 
NDCuboidMapper.class) {
+                reducerNum = 
ReducerNumSizing.getLayeredCubingReduceTaskNum(segment, 
AbstractHadoopJob.getTotalMapInputMB(job), level);
+            } else if (mapperClass == InMemCuboidMapper.class) {
+                reducerNum = 
ReducerNumSizing.getInmemCubingReduceTaskNum(segment);
+            }
+            Path outputPath = new Path(output);
+            FileOutputFormat.setOutputPath(job, outputPath);
+            job.setOutputFormatClass(SequenceFileOutputFormat.class);
+            job.setNumReduceTasks(reducerNum);
+            HadoopUtil.deletePath(job.getConfiguration(), outputPath);
+        }
+    }
+
     @Override
     public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(final CubeSegment 
seg) {
         return new IMRBatchMergeOutputSide2() {
@@ -86,6 +130,59 @@ public class HBaseMROutput2Transition implements IMROutput2 
{
             public void addStepPhase3_Cleanup(DefaultChainedExecutable 
jobFlow) {
                 steps.addMergingGarbageCollectionSteps(jobFlow);
             }
+
+            @Override
+            public IMRMergeOutputFormat getOuputFormat() {
+                return new HBaseMergeMROutputFormat();
+            }
         };
     }
+
+    public static class HBaseMergeMROutputFormat implements 
IMRMergeOutputFormat{
+
+        private static final Pattern JOB_NAME_PATTERN = 
Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
+
+        @Override
+        public void configureJobInput(Job job, String input) throws Exception {
+            job.setInputFormatClass(SequenceFileInputFormat.class);
+        }
+
+        @Override
+        public void configureJobOutput(Job job, String output, CubeSegment 
segment) throws Exception {
+            int reducerNum = 
ReducerNumSizing.getLayeredCubingReduceTaskNum(segment, 
AbstractHadoopJob.getTotalMapInputMB(job), -1);
+            job.setNumReduceTasks(reducerNum);
+
+            Path outputPath = new Path(output);
+            HadoopUtil.deletePath(job.getConfiguration(), outputPath);
+            FileOutputFormat.setOutputPath(job, outputPath);
+            job.setOutputFormatClass(SequenceFileOutputFormat.class);
+        }
+
+        @Override
+        public CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance 
cube) {
+            String filePath = fileSplit.getPath().toString();
+            String jobID = extractJobIDFromPath(filePath);
+            return findSegmentWithUuid(jobID, cube);
+        }
+
+        private static String extractJobIDFromPath(String path) {
+            Matcher matcher = JOB_NAME_PATTERN.matcher(path);
+            // check the first occurrence
+            if (matcher.find()) {
+                return matcher.group(1);
+            } else {
+                throw new IllegalStateException("Can not extract job ID from 
file path : " + path);
+            }
+        }
+
+        private static CubeSegment findSegmentWithUuid(String jobID, 
CubeInstance cubeInstance) {
+            for (CubeSegment segment : cubeInstance.getSegments()) {
+                String lastBuildJobID = segment.getLastBuildJobID();
+                if (lastBuildJobID != null && 
lastBuildJobID.equalsIgnoreCase(jobID)) {
+                    return segment;
+                }
+            }
+            throw new IllegalStateException("No merging segment's last build 
job ID equals " + jobID);
+        }
+    }
 }
\ No newline at end of file

Reply via email to