Repository: kylin Updated Branches: refs/heads/yang21-hbase1.x eb6c3b064 -> f6735a811 (forced update)
minor, mr code clean up Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f5e61980 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f5e61980 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f5e61980 Branch: refs/heads/yang21-hbase1.x Commit: f5e619802c54aaca03a1771c9a38af33f6adf50c Parents: b423384 Author: Yang Li <liy...@apache.org> Authored: Fri Nov 11 22:51:23 2016 +0800 Committer: Yang Li <liy...@apache.org> Committed: Fri Nov 11 22:51:23 2016 +0800 ---------------------------------------------------------------------- .../main/java/org/apache/kylin/engine/mr/KylinMapper.java | 6 ++++-- .../main/java/org/apache/kylin/engine/mr/KylinReducer.java | 6 ++++-- .../java/org/apache/kylin/engine/mr/steps/CuboidJob.java | 8 +++----- .../org/apache/kylin/engine/mr/steps/InMemCuboidJob.java | 3 ++- 4 files changed, 13 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/f5e61980/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java index a527b3d..a01f7a2 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java @@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory; /** */ -abstract public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { +public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { private static final Logger logger = LoggerFactory.getLogger(KylinMapper.class); protected void bindCurrentConfiguration(Configuration conf) { @@ -54,7 +54,9 @@ abstract public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapp } } - abstract protected void doMap(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException; + protected void doMap(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { + super.map(key, value, context); + } @Override final protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { http://git-wip-us.apache.org/repos/asf/kylin/blob/f5e61980/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java index 2987032..2b63ce0 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java @@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory; /** */ -abstract public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { +public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { private static final Logger logger = LoggerFactory.getLogger(KylinReducer.class); protected void bindCurrentConfiguration(Configuration conf) { @@ -53,7 +53,9 @@ abstract public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Red } } - abstract protected void doReduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException; + protected void doReduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { + super.reduce(key, values, context); + } @Override final protected void cleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { http://git-wip-us.apache.org/repos/asf/kylin/blob/f5e61980/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java index 6b0c86e..9486e60 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java @@ -21,11 +21,11 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; import org.apache.commons.cli.Options; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; @@ -53,7 +53,6 @@ import org.slf4j.LoggerFactory; public class CuboidJob extends AbstractHadoopJob { protected static final Logger logger = LoggerFactory.getLogger(CuboidJob.class); - private static final String MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"; @SuppressWarnings("rawtypes") private Class<? extends Mapper> mapperClass; @@ -165,7 +164,6 @@ public class CuboidJob extends AbstractHadoopJob { } protected void setReduceTaskNum(Job job, CubeDesc cubeDesc, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException { - Configuration jobConf = job.getConfiguration(); KylinConfig kylinConfig = cubeDesc.getConfig(); double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB(); @@ -200,12 +198,12 @@ public class CuboidJob extends AbstractHadoopJob { // no more than 500 reducer by default numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks); - jobConf.setInt(MAPRED_REDUCE_TASKS, numReduceTasks); + job.setNumReduceTasks(numReduceTasks); logger.info("Having total map input MB " + Math.round(totalMapInputMB)); logger.info("Having level " + level + ", pre-level cuboids " + preLevelCuboids + ", this level cuboids " + thisLevelCuboids); logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio); - logger.info("Setting " + MAPRED_REDUCE_TASKS + "=" + numReduceTasks); + logger.info("Setting " + Context.NUM_REDUCES + "=" + numReduceTasks); } /** http://git-wip-us.apache.org/repos/asf/kylin/blob/f5e61980/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java index 013f2c9..d9558a4 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java @@ -25,6 +25,7 @@ import org.apache.commons.cli.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.ToolRunner; @@ -157,7 +158,7 @@ public class InMemCuboidJob extends AbstractHadoopJob { logger.info("Having total map input MB " + Math.round(totalSizeInM)); logger.info("Having per reduce MB " + perReduceInputMB); - logger.info("Setting " + "mapred.reduce.tasks" + "=" + numReduceTasks); + logger.info("Setting " + Context.NUM_REDUCES + "=" + numReduceTasks); return numReduceTasks; }