minor refactor to make builder more extendable
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/23d73ef3 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/23d73ef3 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/23d73ef3 Branch: refs/heads/master Commit: 23d73ef3bee464dec5abaf2188d86598f8493178 Parents: 80bb6f7 Author: Hongbin Ma <[email protected]> Authored: Tue Jul 12 11:38:58 2016 +0800 Committer: Hongbin Ma <[email protected]> Committed: Tue Jul 12 11:39:34 2016 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/cube/CubeInstance.java | 1 + .../kylin/engine/mr/BatchCubingJobBuilder2.java | 19 ++++++++++++++++--- .../kylin/engine/mr/BatchMergeJobBuilder2.java | 8 +++++++- .../kylin/engine/mr/steps/MergeCuboidJob.java | 1 - .../kylin/engine/mr/steps/MergeCuboidMapper.java | 2 +- 5 files changed, 25 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/23d73ef3/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java index 6820a60..ce12ac8 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java @@ -305,6 +305,7 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, return result; } + public CubeSegment getSegment(String name, SegmentStatusEnum status) { for (CubeSegment segment : segments) { http://git-wip-us.apache.org/repos/asf/kylin/blob/23d73ef3/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java index 9c4ddb2..afa601c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java @@ -22,6 +22,7 @@ import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.RowKeyDesc; import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide; import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.MapReduceExecutable; import org.apache.kylin.engine.mr.steps.BaseCuboidJob; @@ -115,11 +116,15 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); cubeStep.setMapReduceParams(cmd.toString()); - cubeStep.setMapReduceJobClass(InMemCuboidJob.class); + cubeStep.setMapReduceJobClass(getInMemCuboidJob()); cubeStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES); return cubeStep; } + protected Class<? extends AbstractHadoopJob> getInMemCuboidJob() { + return InMemCuboidJob.class; + } + private MapReduceExecutable createBaseCuboidStep(String[] cuboidOutputTempPath, String jobId) { // base cuboid job MapReduceExecutable baseCuboidStep = new MapReduceExecutable(); @@ -138,11 +143,15 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); baseCuboidStep.setMapReduceParams(cmd.toString()); - baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class); + baseCuboidStep.setMapReduceJobClass(getBaseCuboidJob()); baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES); return baseCuboidStep; } + protected Class<? extends AbstractHadoopJob> getBaseCuboidJob() { + return BaseCuboidJob.class; + } + private MapReduceExecutable createNDimensionCuboidStep(String[] cuboidOutputTempPath, int dimNum, int totalRowkeyColumnCount, String jobId) { // ND cuboid job MapReduceExecutable ndCuboidStep = new MapReduceExecutable(); @@ -160,7 +169,11 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); ndCuboidStep.setMapReduceParams(cmd.toString()); - ndCuboidStep.setMapReduceJobClass(NDCuboidJob.class); + ndCuboidStep.setMapReduceJobClass(getNDCuboidJob()); return ndCuboidStep; } + + protected Class<? extends AbstractHadoopJob> getNDCuboidJob() { + return NDCuboidJob.class; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/23d73ef3/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java index f1d7281..06b7528 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.MapReduceExecutable; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; @@ -103,8 +104,13 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport { appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step"); mergeCuboidDataStep.setMapReduceParams(cmd.toString()); - mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidJob.class); + mergeCuboidDataStep.setMapReduceJobClass(getMergeCuboidJob()); return mergeCuboidDataStep; } + protected Class<? extends AbstractHadoopJob> getMergeCuboidJob() { + return MergeCuboidJob.class; + + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/23d73ef3/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java index be1a1c1..5546bce 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java @@ -69,7 +69,6 @@ public class MergeCuboidJob extends CuboidJob { job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); - // Reducer - only one job.setReducerClass(CuboidReducer.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(Text.class); http://git-wip-us.apache.org/repos/asf/kylin/blob/23d73ef3/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java index bacc77b..5fd321c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java @@ -143,7 +143,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"); - public static CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance cube) { + public CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance cube) { String filePath = fileSplit.getPath().toString(); String jobID = extractJobIDFromPath(filePath); return findSegmentWithUuid(jobID, cube);
