KYLIN-1560 Make BatchCubingJobBuilder2 easier to add additional step Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/be6e2065 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/be6e2065 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/be6e2065
Branch: refs/heads/1.5.x-HBase1.1.3 Commit: be6e2065a853f71a2e898ac9cf3c4144ed6fb8fd Parents: 9a8153a Author: shaofengshi <[email protected]> Authored: Wed Apr 6 14:46:55 2016 +0800 Committer: shaofengshi <[email protected]> Committed: Wed Apr 6 14:55:55 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/common/KylinConfigBase.java | 9 +++++++++ .../org/apache/kylin/engine/EngineFactory.java | 5 +++-- .../kylin/engine/mr/BatchCubingJobBuilder.java | 13 ------------- .../kylin/engine/mr/BatchCubingJobBuilder2.java | 19 +++++-------------- .../kylin/engine/mr/BatchMergeJobBuilder2.java | 5 +++++ .../kylin/engine/mr/JobBuilderSupport.java | 14 ++++++++++++++ .../engine/mr/steps/BaseCuboidMapperBase.java | 19 ++++++++++++++----- .../engine/mr/steps/HiveToBaseCuboidMapper.java | 8 -------- 8 files changed, 50 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/be6e2065/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 9db3081..2c0b353 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -631,4 +631,13 @@ abstract public class KylinConfigBase implements Serializable { return getOptional("kylin.init.tasks"); } + public String getMRBatchEngineV1Class() { + return getOptional("kylin.cube.mr.engine.v1.class", "org.apache.kylin.engine.mr.MRBatchCubingEngine"); + } + + public String getMRBatchEngineV2Class() { + return getOptional("kylin.cube.mr.engine.v2.class", "org.apache.kylin.engine.mr.MRBatchCubingEngine2"); + } + + } http://git-wip-us.apache.org/repos/asf/kylin/blob/be6e2065/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java index 919ede6..bba9060 100644 --- a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java +++ b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java @@ -24,6 +24,7 @@ import static org.apache.kylin.metadata.model.IEngineAware.ID_MR_V2; import java.util.HashMap; import java.util.Map; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ImplementationSwitch; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.job.execution.DefaultChainedExecutable; @@ -35,8 +36,8 @@ public class EngineFactory { private static ImplementationSwitch<IStreamingCubingEngine> streamingEngines; static { Map<Integer, String> impls = new HashMap<>(); - impls.put(ID_MR_V1, "org.apache.kylin.engine.mr.MRBatchCubingEngine"); - impls.put(ID_MR_V2, "org.apache.kylin.engine.mr.MRBatchCubingEngine2"); + impls.put(ID_MR_V1, KylinConfig.getInstanceFromEnv().getMRBatchEngineV1Class()); + impls.put(ID_MR_V2, KylinConfig.getInstanceFromEnv().getMRBatchEngineV2Class()); batchEngines = new ImplementationSwitch<IBatchCubingEngine>(impls, IBatchCubingEngine.class); impls.clear(); http://git-wip-us.apache.org/repos/asf/kylin/blob/be6e2065/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java index 45d03d1..7f729a6 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java @@ -126,17 +126,4 @@ public class BatchCubingJobBuilder extends JobBuilderSupport { return ndCuboidStep; } - private String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) { - String[] paths = new String[groupRowkeyColumnsCount + 1]; - for (int i = 0; i <= groupRowkeyColumnsCount; i++) { - int dimNum = totalRowkeyColumnCount - i; - if (dimNum == totalRowkeyColumnCount) { - paths[i] = cuboidRootPath + "base_cuboid"; - } else { - paths[i] = cuboidRootPath + dimNum + "d_cuboid"; - } - } - return paths; - } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/be6e2065/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java index 5f4a3ed..0b1bd90 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java @@ -59,6 +59,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { result.addTask(createFactDistinctColumnsStepWithStats(jobId)); result.addTask(createBuildDictionaryStep(jobId)); result.addTask(createSaveStatisticsStep(jobId)); + addOtherStepBeforeCubing(result); outputSide.addStepPhase2_BuildDictionary(result); // Phase 3: Build Cube @@ -98,6 +99,10 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { return result; } + protected void addOtherStepBeforeCubing(CubingJob result) { + + } + private MapReduceExecutable createInMemCubingStep(String jobId, String cuboidRootPath) { // base cuboid job MapReduceExecutable cubeStep = new MapReduceExecutable(); @@ -162,18 +167,4 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { ndCuboidStep.setMapReduceJobClass(NDCuboidJob.class); return ndCuboidStep; } - - private String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) { - String[] paths = new String[groupRowkeyColumnsCount + 1]; - for (int i = 0; i <= groupRowkeyColumnsCount; i++) { - int dimNum = totalRowkeyColumnCount - i; - if (dimNum == totalRowkeyColumnCount) { - paths[i] = cuboidRootPath + "base_cuboid"; - } else { - paths[i] = cuboidRootPath + dimNum + "d_cuboid"; - } - } - return paths; - } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/be6e2065/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java index e151674..08ddaf8 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java @@ -64,6 +64,7 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport { // Phase 1: Merge Dictionary result.addTask(createMergeDictionaryStep(mergingSegmentIds)); result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId))); + addOtherStepBeforeMerge(result); outputSide.addStepPhase1_MergeDictionary(result); // Phase 2: Merge Cube Files @@ -107,4 +108,8 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport { return mergeCuboidDataStep; } + protected void addOtherStepBeforeMerge(CubingJob result) { + + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/be6e2065/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index a1a0cf3..7463fe0 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -187,4 +187,18 @@ public class JobBuilderSupport { return buf.append(" -").append(paraName).append(" ").append(paraValue); } + public String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) { + String[] paths = new String[groupRowkeyColumnsCount + 1]; + for (int i = 0; i <= groupRowkeyColumnsCount; i++) { + int dimNum = totalRowkeyColumnCount - i; + if (dimNum == totalRowkeyColumnCount) { + paths[i] = cuboidRootPath + "base_cuboid"; + } else { + paths[i] = cuboidRootPath + dimNum + "d_cuboid"; + } + } + return paths; + } + + } http://git-wip-us.apache.org/repos/asf/kylin/blob/be6e2065/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java index bc664aa..a1eeb1b 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java @@ -19,6 +19,7 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; @@ -56,7 +57,7 @@ import com.google.common.collect.Lists; /** */ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, Text, Text> { - protected static final Logger logger = LoggerFactory.getLogger(HiveToBaseCuboidMapper.class); + protected static final Logger logger = LoggerFactory.getLogger(BaseCuboidMapperBase.class); public static final byte[] HIVE_NULL = Bytes.toBytes("\\N"); public static final byte[] ONE = Bytes.toBytes("1"); protected String cubeName; @@ -78,8 +79,8 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL protected AbstractRowKeyEncoder rowKeyEncoder; protected MeasureCodec measureCodec; private int errorRecordCounter; - private Text outputKey = new Text(); - private Text outputValue = new Text(); + protected Text outputKey = new Text(); + protected Text outputValue = new Text(); private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); @Override @@ -132,7 +133,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL } } - private boolean isNull(byte[] v) { + protected boolean isNull(byte[] v) { for (byte[] nullByte : nullBytes) { if (Bytes.equals(v, nullByte)) return true; @@ -140,7 +141,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL return false; } - private byte[] buildKey(SplittedBytes[] splitBuffers) { + protected byte[] buildKey(SplittedBytes[] splitBuffers) { int[] rowKeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes(); for (int i = 0; i < baseCuboid.getColumns().size(); i++) { int index = rowKeyColumnIndexes[i]; @@ -207,6 +208,14 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL context.write(outputKey, outputValue); } + protected byte[][] convertUTF8Bytes(String[] row) throws UnsupportedEncodingException { + byte[][] result = new byte[row.length][]; + for (int i = 0; i < row.length; i++) { + result[i] = row[i] == null ? HIVE_NULL : row[i].getBytes("UTF-8"); + } + return result; + } + protected void handleErrorRecord(BytesSplitter bytesSplitter, Exception ex) throws IOException { logger.error("Insane record: " + bytesSplitter, ex); http://git-wip-us.apache.org/repos/asf/kylin/blob/be6e2065/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java index 8f5557d..96e8030 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java @@ -57,12 +57,4 @@ public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, O } } - private byte[][] convertUTF8Bytes(String[] row) throws UnsupportedEncodingException { - byte[][] result = new byte[row.length][]; - for (int i = 0; i < row.length; i++) { - result[i] = row[i] == null ? HIVE_NULL : row[i].getBytes("UTF-8"); - } - return result; - } - }
