KYLIN-1859 Use segment "uuid" instead of "name" to seek a segment across the system
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a35dc3cb Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a35dc3cb Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a35dc3cb Branch: refs/heads/master Commit: a35dc3cb6b4e5f6a89001c146648200a15456443 Parents: 76d8672 Author: shaofengshi <shaofeng...@apache.org> Authored: Fri Jul 8 13:29:35 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Wed Aug 10 10:31:42 2016 +0800 ---------------------------------------------------------------------- .../kylin/job/streaming/KafkaDataLoader.java | 2 +- .../java/org/apache/kylin/cube/CubeManager.java | 23 ++++++++++++++++---- .../kylin/cube/cli/DictionaryGeneratorCLI.java | 5 ++--- .../cube/model/CubeJoinedFlatTableDesc.java | 2 +- .../kylin/engine/mr/BatchCubingJobBuilder.java | 4 ++-- .../kylin/engine/mr/BatchCubingJobBuilder2.java | 6 ++--- .../kylin/engine/mr/BatchMergeJobBuilder.java | 2 +- .../kylin/engine/mr/BatchMergeJobBuilder2.java | 15 +++++++++++++ .../kylin/engine/mr/JobBuilderSupport.java | 4 ++-- .../engine/mr/common/AbstractHadoopJob.java | 2 ++ .../kylin/engine/mr/common/BatchConstants.java | 4 +++- .../engine/mr/steps/BaseCuboidMapperBase.java | 6 ++--- .../engine/mr/steps/CreateDictionaryJob.java | 6 ++--- .../apache/kylin/engine/mr/steps/CuboidJob.java | 10 ++++----- .../engine/mr/steps/FactDistinctColumnsJob.java | 14 ++++++------ .../mr/steps/FactDistinctColumnsMapperBase.java | 2 +- .../kylin/engine/mr/steps/InMemCuboidJob.java | 8 +++---- .../engine/mr/steps/InMemCuboidMapper.java | 4 ++-- .../kylin/engine/mr/steps/MergeCuboidJob.java | 6 ++--- .../engine/mr/steps/MergeCuboidMapper.java | 6 ++--- .../kylin/engine/mr/steps/NDCuboidMapper.java | 6 ++--- .../engine/mr/steps/NDCuboidMapperTest.java | 4 ++-- .../test_kylin_cube_with_slr_1_new_segment.json | 1 + .../test_streaming_table_cube_desc.json | 2 +- .../storage/hbase/steps/CreateHTableJob.java | 8 +++---- .../kylin/storage/hbase/steps/HBaseMRSteps.java | 2 +- 26 files changed, 94 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java index 0eaae20..454f6cf 100644 --- a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java +++ b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java @@ -69,7 +69,7 @@ public class KafkaDataLoader extends StreamDataLoader { List<KeyedMessage<String, String>> keyedMessages = Lists.newArrayList(); for (int i = 0; i < messages.size(); ++i) { - KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i)); + KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i)); keyedMessages.add(keyedMessage); } producer.send(keyedMessages); http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index a456537..4533ae6 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -421,10 +421,14 @@ public class CubeManager implements IRealizationProvider { // try figure out a reasonable start if missing if (startDate == 0 && startOffset == 0) { boolean isOffsetsOn = endOffset != 0; - if (isOffsetsOn) - startOffset = calculateStartDateForAppendSegment(cube); - else + if (isOffsetsOn) { + startOffset = calculateStartOffsetForAppendSegment(cube); + if (startOffset == Long.MAX_VALUE) { + throw new IllegalStateException("There is already one pending for building segment, please submit request later."); + } + } else { startDate = calculateStartDateForAppendSegment(cube); + } } } else { startDate = 0; @@ -570,12 +574,23 @@ public class CubeManager implements IRealizationProvider { return max; } + + private long calculateStartOffsetForAppendSegment(CubeInstance cube) { + List<CubeSegment> existing = cube.getSegments(); + if (existing.isEmpty()) { + return 0; + } else { + return existing.get(existing.size() - 1).getSourceOffsetEnd(); + } + } + + private long calculateStartDateForAppendSegment(CubeInstance cube) { List<CubeSegment> existing = cube.getSegments(); if (existing.isEmpty()) { return cube.getDescriptor().getPartitionDateStart(); } else { - return existing.get(existing.size() - 1).getSourceOffsetEnd(); + return existing.get(existing.size() - 1).getDateRangeStart(); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java index 3b65d1f..d3b0782 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java @@ -26,7 +26,6 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.DimensionDesc; import org.apache.kylin.dict.DistinctColumnValuesProvider; -import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,9 +34,9 @@ public class DictionaryGeneratorCLI { private static final Logger logger = LoggerFactory.getLogger(DictionaryGeneratorCLI.class); - public static void processSegment(KylinConfig config, String cubeName, String segmentName, DistinctColumnValuesProvider factTableValueProvider) throws IOException { + public static void processSegment(KylinConfig config, String cubeName, String segmentID, DistinctColumnValuesProvider factTableValueProvider) throws IOException { CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); - CubeSegment segment = cube.getSegment(segmentName, SegmentStatusEnum.NEW); + CubeSegment segment = cube.getSegmentById(segmentID); processSegment(config, segment, factTableValueProvider); } http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java index b24ef4d..04aea89 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java @@ -78,7 +78,7 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc { if (cubeSegment == null) { this.tableName = "kylin_intermediate_" + cubeDesc.getName(); } else { - this.tableName = "kylin_intermediate_" + cubeDesc.getName() + "_" + cubeSegment.getName(); + this.tableName = "kylin_intermediate_" + cubeDesc.getName() + "_" + cubeSegment.getUuid(); } int columnIndex = 0; http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java index 5a098a8..6c973eb 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java @@ -94,7 +94,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport { baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); - appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, "FLAT_TABLE"); // marks flat table input appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[0]); appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName()); @@ -116,7 +116,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport { appendMapReduceParameters(cmd); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); - appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]); appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]); appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step"); http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java index 93ae1e4..6eba3c2 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java @@ -110,7 +110,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { cubeStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); - appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidRootPath); appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Cube_Builder_" + seg.getRealization().getName()); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); @@ -135,7 +135,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); - appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, "FLAT_TABLE"); // marks flat table input appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[0]); appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName()); @@ -161,7 +161,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { appendMapReduceParameters(cmd); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); - appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]); appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]); appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step"); http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java index 0769b52..33b6f29 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java @@ -86,7 +86,7 @@ public class BatchMergeJobBuilder extends JobBuilderSupport { appendMapReduceParameters(cmd); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getCubeInstance().getName()); - appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath); appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath); appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step"); http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java index 10483eb..71a20a1 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java @@ -88,6 +88,21 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport { protected Class<? extends AbstractHadoopJob> getMergeCuboidJob() { return MergeCuboidJob.class; +// private MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, String inputPath, String outputPath) { +// MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable(); +// mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID); +// StringBuilder cmd = new StringBuilder(); +// +// appendMapReduceParameters(cmd); +// appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getCubeInstance().getName()); +// appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); +// appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath); +// appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath); +// appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step"); +// +// mergeCuboidDataStep.setMapReduceParams(cmd.toString()); +// mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidJob.class); +// return mergeCuboidDataStep; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index 86451c9..159e5cb 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -68,7 +68,7 @@ public class JobBuilderSupport { appendMapReduceParameters(cmd); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getFactDistinctColumnsPath(jobId)); - appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_ENABLED, String.valueOf(withStats)); appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_OUTPUT, getStatisticsPath(jobId)); appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_SAMPLING_PERCENT, String.valueOf(config.getConfig().getCubingInMemSamplingPercent())); @@ -84,7 +84,7 @@ public class JobBuilderSupport { buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY); StringBuilder cmd = new StringBuilder(); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); - appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getFactDistinctColumnsPath(jobId)); buildDictionaryStep.setJobParams(cmd.toString()); http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index 04ecc71..af2ed9f 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -79,7 +79,9 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName(BatchConstants.ARG_JOB_NAME).hasArg().isRequired(true).withDescription("Job name. For example, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create(BatchConstants.ARG_JOB_NAME); protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create(BatchConstants.ARG_CUBE_NAME); protected static final Option OPTION_CUBING_JOB_ID = OptionBuilder.withArgName(BatchConstants.ARG_CUBING_JOB_ID).hasArg().isRequired(false).withDescription("ID of cubing job executable").create(BatchConstants.ARG_CUBING_JOB_ID); +// @Deprecated protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_NAME).hasArg().isRequired(true).withDescription("Cube segment name").create(BatchConstants.ARG_SEGMENT_NAME); + protected static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_ID).hasArg().isRequired(true).withDescription("Cube segment id").create(BatchConstants.ARG_SEGMENT_ID); protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg().isRequired(true).withDescription("Input path").create(BatchConstants.ARG_INPUT); protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName(BatchConstants.ARG_INPUT_FORMAT).hasArg().isRequired(false).withDescription("Input format").create(BatchConstants.ARG_INPUT_FORMAT); protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Output path").create(BatchConstants.ARG_OUTPUT); http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index 387e695..f0503a8 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -30,7 +30,8 @@ public interface BatchConstants { */ String CFG_CUBE_NAME = "cube.name"; - String CFG_CUBE_SEGMENT_NAME = "cube.segment.name"; +// String CFG_CUBE_SEGMENT_NAME = "cube.segment.name"; + String CFG_CUBE_SEGMENT_ID = "cube.segment.id"; String CFG_CUBE_CUBOID_LEVEL = "cube.cuboid.level"; String CFG_II_NAME = "ii.name"; @@ -65,6 +66,7 @@ public interface BatchConstants { String ARG_CUBE_NAME = "cubename"; String ARG_II_NAME = "iiname"; String ARG_SEGMENT_NAME = "segmentname"; + String ARG_SEGMENT_ID = "segmentid"; String ARG_PARTITION = "partitions"; String ARG_STATS_ENABLED = "statisticsenabled"; String ARG_STATS_OUTPUT = "statisticsoutput"; http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java index 10fbba3..cc2bf7d 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java @@ -60,7 +60,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL public static final byte[] HIVE_NULL = Bytes.toBytes("\\N"); public static final byte[] ONE = Bytes.toBytes("1"); protected String cubeName; - protected String segmentName; + protected String segmentID; protected Cuboid baseCuboid; protected CubeInstance cube; protected CubeDesc cubeDesc; @@ -86,7 +86,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL super.bindCurrentConfiguration(context.getConfiguration()); cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase(); - segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME); + segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID); intermediateTableRowDelimiter = context.getConfiguration().get(BatchConstants.CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER, Character.toString(BatchConstants.INTERMEDIATE_TABLE_ROW_DELIMITER)); if (Bytes.toBytes(intermediateTableRowDelimiter).length > 1) { throw new RuntimeException("Expected delimiter byte length is 1, but got " + Bytes.toBytes(intermediateTableRowDelimiter).length); @@ -98,7 +98,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL cube = CubeManager.getInstance(config).getCube(cubeName); cubeDesc = cube.getDescriptor(); - cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW); + cubeSegment = cube.getSegmentById(segmentID); long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java index 59233b9..69c0095 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java @@ -43,17 +43,17 @@ public class CreateDictionaryJob extends AbstractHadoopJob { try { options.addOption(OPTION_CUBE_NAME); - options.addOption(OPTION_SEGMENT_NAME); + options.addOption(OPTION_SEGMENT_ID); options.addOption(OPTION_INPUT_PATH); parseOptions(options, args); final String cubeName = getOptionValue(OPTION_CUBE_NAME); - final String segmentName = getOptionValue(OPTION_SEGMENT_NAME); + final String segmentID = getOptionValue(OPTION_SEGMENT_ID); final String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH); KylinConfig config = KylinConfig.getInstanceFromEnv(); - DictionaryGeneratorCLI.processSegment(config, cubeName, segmentName, new DistinctColumnValuesProvider() { + DictionaryGeneratorCLI.processSegment(config, cubeName, segmentID, new DistinctColumnValuesProvider() { @Override public ReadableTable getDistinctValuesFor(TblColRef col) { return new DFSFileTable(factColumnsInputPath + "/" + col.getName(), -1); http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java index f3524f8..90dec84 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java @@ -86,7 +86,7 @@ public class CuboidJob extends AbstractHadoopJob { try { options.addOption(OPTION_JOB_NAME); options.addOption(OPTION_CUBE_NAME); - options.addOption(OPTION_SEGMENT_NAME); + options.addOption(OPTION_SEGMENT_ID); options.addOption(OPTION_INPUT_PATH); options.addOption(OPTION_OUTPUT_PATH); options.addOption(OPTION_NCUBOID_LEVEL); @@ -97,14 +97,14 @@ public class CuboidJob extends AbstractHadoopJob { Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); int nCuboidLevel = Integer.parseInt(getOptionValue(OPTION_NCUBOID_LEVEL)); - String segmentName = getOptionValue(OPTION_SEGMENT_NAME); + String segmentID = getOptionValue(OPTION_SEGMENT_ID); String cubingJobId = getOptionValue(OPTION_CUBING_JOB_ID); CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); CubeInstance cube = cubeMgr.getCube(cubeName); if (checkSkip(cubingJobId)) { - logger.info("Skip job " + getOptionValue(OPTION_JOB_NAME) + " for " + cubeName + "[" + segmentName + "]"); + logger.info("Skip job " + getOptionValue(OPTION_JOB_NAME) + " for " + segmentID + "[" + segmentID + "]"); return 0; } @@ -115,7 +115,7 @@ public class CuboidJob extends AbstractHadoopJob { setJobClasspath(job, cube.getConfig()); // Mapper - configureMapperInputFormat(cube.getSegment(segmentName, SegmentStatusEnum.NEW)); + configureMapperInputFormat(cube.getSegmentById(segmentID)); job.setMapperClass(this.mapperClass); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); @@ -131,7 +131,7 @@ public class CuboidJob extends AbstractHadoopJob { // set job configuration job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); - job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName); + job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); job.getConfiguration().setInt(BatchConstants.CFG_CUBE_CUBOID_LEVEL, nCuboidLevel); // add metadata to distributed cache attachKylinPropsAndMetadata(cube, job.getConfiguration()); http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java index f091ab9..a4b087b 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java @@ -56,7 +56,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { options.addOption(OPTION_CUBE_NAME); options.addOption(OPTION_CUBING_JOB_ID); options.addOption(OPTION_OUTPUT_PATH); - options.addOption(OPTION_SEGMENT_NAME); + options.addOption(OPTION_SEGMENT_ID); options.addOption(OPTION_STATISTICS_ENABLED); options.addOption(OPTION_STATISTICS_OUTPUT); options.addOption(OPTION_STATISTICS_SAMPLING_PERCENT); @@ -68,7 +68,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { String cubeName = getOptionValue(OPTION_CUBE_NAME); Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); - String segmentName = getOptionValue(OPTION_SEGMENT_NAME); + String segmentID = getOptionValue(OPTION_SEGMENT_ID); String statistics_enabled = getOptionValue(OPTION_STATISTICS_ENABLED); String statistics_output = getOptionValue(OPTION_STATISTICS_OUTPUT); String statistics_sampling_percent = getOptionValue(OPTION_STATISTICS_SAMPLING_PERCENT); @@ -80,7 +80,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { List<TblColRef> columnsNeedDict = cubeMgr.getAllDictColumnsOnFact(cube.getDescriptor()); job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); - job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName); + job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, statistics_enabled); job.getConfiguration().set(BatchConstants.CFG_STATISTICS_OUTPUT, statistics_output); job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent); @@ -88,10 +88,10 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { setJobClasspath(job, cube.getConfig()); - CubeSegment segment = cube.getSegment(segmentName, SegmentStatusEnum.NEW); + CubeSegment segment = cube.getSegmentById(segmentID); if (segment == null) { - logger.error("Failed to find {} in cube {}", segmentName, cube); - System.out.println("Failed to find {} in cube {} " + segmentName + "," + cube); + logger.error("Failed to find {} in cube {}", segmentID, cube); + System.out.println("Failed to find {} in cube {} " + segmentID + "," + cube); for (CubeSegment s : cube.getSegments()) { logger.error(s.getName() + " with status " + s.getStatus()); System.out.println(s.getName() + " with status " + s.getStatus()); @@ -101,7 +101,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { logger.info("Found segment: " + segment); System.out.println("Found segment " + segment); } - setupMapper(segment); + setupMapper(cube.getSegmentById(segmentID)); setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? columnsNeedDict.size() + 1 : columnsNeedDict.size()); attachKylinPropsAndMetadata(cube, job.getConfiguration()); http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java index 35481fd..5680004 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java @@ -66,7 +66,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K cubeName = conf.get(BatchConstants.CFG_CUBE_NAME); cube = CubeManager.getInstance(config).getCube(cubeName); - cubeSeg = cube.getSegment(conf.get(BatchConstants.CFG_CUBE_SEGMENT_NAME), SegmentStatusEnum.NEW); + cubeSeg = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID)); cubeDesc = cube.getDescriptor(); baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); factDictCols = CubeManager.getInstance(config).getAllDictColumnsOnFact(cubeDesc); http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java index 510dbe8..d50c8a5 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java @@ -75,18 +75,18 @@ public class InMemCuboidJob extends AbstractHadoopJob { try { options.addOption(OPTION_JOB_NAME); options.addOption(OPTION_CUBE_NAME); - options.addOption(OPTION_SEGMENT_NAME); + options.addOption(OPTION_SEGMENT_ID); options.addOption(OPTION_OUTPUT_PATH); options.addOption(OPTION_CUBING_JOB_ID); parseOptions(options, args); String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); - String segmentName = getOptionValue(OPTION_SEGMENT_NAME); + String segmentID = getOptionValue(OPTION_SEGMENT_ID); String output = getOptionValue(OPTION_OUTPUT_PATH); CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); CubeInstance cube = cubeMgr.getCube(cubeName); - CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW); + CubeSegment cubeSeg = cube.getSegmentById(segmentID); String cubingJobId = getOptionValue(OPTION_CUBING_JOB_ID); if (checkSkip(cubingJobId)) { @@ -105,7 +105,7 @@ public class InMemCuboidJob extends AbstractHadoopJob { // set job configuration job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); - job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName); + job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); // set input IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat(); http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java index 7baf5c5..75b6489 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java @@ -77,8 +77,8 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME); cube = CubeManager.getInstance(config).getCube(cubeName); cubeDesc = cube.getDescriptor(); - String segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME); - cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW); + String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID); + cubeSegment = cube.getSegmentById(segmentID); flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat(); Map<TblColRef, Dictionary<String>> dictionaryMap = Maps.newHashMap(); http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java index 5546bce..e0ae74d 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java @@ -39,13 +39,13 @@ public class MergeCuboidJob extends CuboidJob { try { options.addOption(OPTION_JOB_NAME); options.addOption(OPTION_CUBE_NAME); - options.addOption(OPTION_SEGMENT_NAME); + options.addOption(OPTION_SEGMENT_ID); options.addOption(OPTION_INPUT_PATH); options.addOption(OPTION_OUTPUT_PATH); parseOptions(options, args); String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); - String segmentName = getOptionValue(OPTION_SEGMENT_NAME).toUpperCase(); + String segmentID = getOptionValue(OPTION_SEGMENT_ID); CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); CubeInstance cube = cubeMgr.getCube(cubeName); @@ -76,7 +76,7 @@ public class MergeCuboidJob extends CuboidJob { // set job configuration job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); - job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName); + job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); // add metadata to distributed cache attachKylinPropsAndMetadata(cube, job.getConfiguration()); http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java index 5fd321c..906ccdc 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java @@ -65,7 +65,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { private KylinConfig config; private String cubeName; - private String segmentName; + private String segmentID; private CubeManager cubeManager; private CubeInstance cube; private CubeDesc cubeDesc; @@ -95,14 +95,14 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { super.bindCurrentConfiguration(context.getConfiguration()); cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase(); - segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase(); + segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID); config = AbstractHadoopJob.loadKylinPropsAndMetadata(); cubeManager = CubeManager.getInstance(config); cube = cubeManager.getCube(cubeName); cubeDesc = cube.getDescriptor(); - mergedCubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW); + mergedCubeSegment = cube.getSegmentById(segmentID); // int colCount = cubeDesc.getRowkey().getRowKeyColumns().length; newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE];// size will auto-grow http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java index d822134..fbd02ac 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java @@ -52,7 +52,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { private Text outputKey = new Text(); private String cubeName; - private String segmentName; + private String segmentID; private CubeSegment cubeSegment; private CubeDesc cubeDesc; private CuboidScheduler cuboidScheduler; @@ -70,12 +70,12 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { super.bindCurrentConfiguration(context.getConfiguration()); cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase(); - segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase(); + segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID); KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); - cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW); + cubeSegment = cube.getSegmentById(segmentID); cubeDesc = cube.getDescriptor(); // initialize CubiodScheduler http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java index caf87e2..daab3b1 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java @@ -70,9 +70,9 @@ public class NDCuboidMapperTest extends LocalFileMetadataTestCase { public void testMapReduceWithSlr() throws IOException { String cubeName = "test_kylin_cube_with_slr_1_new_segment"; - String segmentName = "20130331080000_20131212080000"; + String segmentID = "198va32a-a33e-4b69-83dd-0bb8b1f8c53b"; mapReduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); - mapReduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName); + mapReduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); byte[] key = { 0, 0, 0, 0, 0, 0, 0, 0, 1, -1, 0, -104, -106, -128, 11, 54, -105, 55, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 }; byte[] value = { 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 1, 1 }; http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/examples/test_case_data/localmeta/cube/test_kylin_cube_with_slr_1_new_segment.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/cube/test_kylin_cube_with_slr_1_new_segment.json b/examples/test_case_data/localmeta/cube/test_kylin_cube_with_slr_1_new_segment.json index 7bb8078..8c48ffd 100644 --- a/examples/test_case_data/localmeta/cube/test_kylin_cube_with_slr_1_new_segment.json +++ b/examples/test_case_data/localmeta/cube/test_kylin_cube_with_slr_1_new_segment.json @@ -7,6 +7,7 @@ "descriptor" : "test_kylin_cube_with_slr_desc", "cost" : 50, "segments" : [ { + "uuid" : "198va32a-a33e-4b69-83dd-0bb8b1f8c53b", "name" : "20130331080000_20131212080000", "storage_location_identifier" : "KYLIN-CUBE-TEST_KYLIN_CUBE_WITH_SLR_READY-F24668F6-DCFF-4CB6-A89B-77F1119DF8FA", "date_range_start" : 1364688000000, http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json index 23e5b00..ef10c1e 100644 --- a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json +++ b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json @@ -106,7 +106,7 @@ } } ], "override_kylin_properties": { - "kylin.cube.algorithm": "random" + "kylin.cube.algorithm": "inmem" }, "notify_list" : [ ], "status_need_notify" : [ ], http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java index b93e0a1..5c45673 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java @@ -66,7 +66,7 @@ public class CreateHTableJob extends AbstractHadoopJob { CubeInstance cube = null; CubeDesc cubeDesc = null; - String segmentName = null; + String segmentID = null; KylinConfig kylinConfig; Path partitionFilePath; @@ -75,7 +75,7 @@ public class CreateHTableJob extends AbstractHadoopJob { Options options = new Options(); options.addOption(OPTION_CUBE_NAME); - options.addOption(OPTION_SEGMENT_NAME); + options.addOption(OPTION_SEGMENT_ID); options.addOption(OPTION_PARTITION_FILE_PATH); options.addOption(OPTION_STATISTICS_ENABLED); parseOptions(options, args); @@ -88,8 +88,8 @@ public class CreateHTableJob extends AbstractHadoopJob { cube = cubeMgr.getCube(cubeName); cubeDesc = cube.getDescriptor(); kylinConfig = cube.getConfig(); - segmentName = getOptionValue(OPTION_SEGMENT_NAME); - CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW); + segmentID = getOptionValue(OPTION_SEGMENT_ID); + CubeSegment cubeSegment = cube.getSegmentById(segmentID); Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java index 0914827..0679feb 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java @@ -87,7 +87,7 @@ public class HBaseMRSteps extends JobBuilderSupport { createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE); StringBuilder cmd = new StringBuilder(); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); - appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000"); appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_ENABLED, String.valueOf(withStats));