Revert "Revert "KYLIN-1726 use segment uuid instead of name""
This reverts commit 1f4880479cd3132786062723ba70312440de4805. Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f89e35f6 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f89e35f6 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f89e35f6 Branch: refs/heads/master Commit: f89e35f6309c9bec43cc16e68fb35b7490aecc38 Parents: 8431af4 Author: shaofengshi <shaofeng...@apache.org> Authored: Sat Sep 24 14:56:17 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Tue Sep 27 10:17:40 2016 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/cube/CubeManager.java | 5 +++- .../kylin/provision/BuildCubeWithStream.java | 26 +++++++++++++++++--- .../apache/kylin/source/kafka/KafkaMRInput.java | 2 +- .../source/kafka/hadoop/KafkaFlatTableJob.java | 11 +++------ .../kafka/hadoop/KafkaInputRecordReader.java | 9 ++++--- 5 files changed, 36 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/f89e35f6/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 2fadedb..cc2baa5 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -479,8 +479,11 @@ public class CubeManager implements IRealizationProvider { updateCube(cubeBuilder); return newSegment; } - public CubeSegment refreshSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset) throws IOException { + return refreshSegment(cube, startDate, endDate, startOffset, endOffset, true); + } + + public CubeSegment refreshSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, boolean strictChecking) throws IOException { checkNoBuildingSegment(cube); CubeSegment newSegment = newSegment(cube, startDate, endDate, startOffset, endOffset); http://git-wip-us.apache.org/repos/asf/kylin/blob/f89e35f6/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index 7f79acc..9e779ab 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -21,6 +21,8 @@ package org.apache.kylin.provision; import java.io.File; import java.io.IOException; import java.text.SimpleDateFormat; +import java.util.HashMap; +import java.util.List; import java.util.TimeZone; import java.util.UUID; @@ -145,18 +147,34 @@ public class BuildCubeWithStream { //merge mergeSegment(cubeName, 0, 15000); + List<CubeSegment> segments = cubeManager.getCube(cubeName).getSegments(); + Assert.assertTrue(segments.size() == 1); + + CubeSegment toRefreshSeg = segments.get(0); + HashMap<String, String> partitionOffsetMap = toRefreshSeg.getAdditionalInfo(); + + refreshSegment(cubeName, toRefreshSeg.getSourceOffsetStart(), toRefreshSeg.getSourceOffsetEnd(), partitionOffsetMap); + segments = cubeManager.getCube(cubeName).getSegments(); + Assert.assertTrue(segments.size() == 1); + } private String mergeSegment(String cubeName, long startOffset, long endOffset) throws Exception { - CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, true); + CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false); DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(segment, "TEST"); jobService.addJob(job); waitForJob(job.getId()); return job.getId(); } - private String refreshSegment(String cubeName, long startOffset, long endOffset) throws Exception { - CubeSegment segment = cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset); + private String refreshSegment(String cubeName, long startOffset, long endOffset, HashMap<String, String> partitionOffsetMap) throws Exception { + CubeSegment segment = cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false); + segment.setAdditionalInfo(partitionOffsetMap); + CubeInstance cubeInstance = cubeManager.getCube(cubeName); + CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance); + cubeBuilder.setToUpdateSegs(segment); + cubeManager.updateCube(cubeBuilder); + segment = cubeManager.getCube(cubeName).getSegmentById(segment.getUuid()); DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); jobService.addJob(job); waitForJob(job.getId()); @@ -164,7 +182,7 @@ public class BuildCubeWithStream { } private String buildSegment(String cubeName, long startOffset, long endOffset) throws Exception { - CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset); + CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false); DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); jobService.addJob(job); waitForJob(job.getId()); http://git-wip-us.apache.org/repos/asf/kylin/blob/f89e35f6/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java index cfce137..a5f678f 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java @@ -165,7 +165,7 @@ public class KafkaMRInput implements IMRInput { jobBuilderSupport.appendMapReduceParameters(cmd); JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath); - JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName()); + JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step"); result.setMapReduceParams(cmd.toString()); http://git-wip-us.apache.org/repos/asf/kylin/blob/f89e35f6/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java index decfb60..87d2471 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java @@ -33,7 +33,6 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.source.kafka.KafkaConfigManager; import org.apache.kylin.source.kafka.config.KafkaConfig; import org.slf4j.Logger; @@ -70,14 +69,14 @@ public class KafkaFlatTableJob extends AbstractHadoopJob { options.addOption(OPTION_JOB_NAME); options.addOption(OPTION_CUBE_NAME); options.addOption(OPTION_OUTPUT_PATH); - options.addOption(OPTION_SEGMENT_NAME); + options.addOption(OPTION_SEGMENT_ID); parseOptions(options, args); job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); String cubeName = getOptionValue(OPTION_CUBE_NAME); Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); - String segmentName = getOptionValue(OPTION_SEGMENT_NAME); + String segmentId = getOptionValue(OPTION_SEGMENT_ID); // ---------------------------------------------------------------------------- // add metadata to distributed cache @@ -85,7 +84,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob { CubeInstance cube = cubeMgr.getCube(cubeName); job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); - job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName); + job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentId); logger.info("Starting: " + job.getJobName()); setJobClasspath(job, cube.getConfig()); @@ -104,11 +103,9 @@ public class KafkaFlatTableJob extends AbstractHadoopJob { job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout())); job.getConfiguration().set(CONFIG_KAFKA_BUFFER_SIZE, String.valueOf(kafkaConfig.getBufferSize())); job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json"); - job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); - job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName); job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName()); job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name - setupMapper(cube.getSegment(segmentName, SegmentStatusEnum.NEW)); + setupMapper(cube.getSegmentById(segmentId)); job.setNumReduceTasks(0); FileOutputFormat.setOutputPath(job, output); FileOutputFormat.setCompressOutput(job, true); http://git-wip-us.apache.org/repos/asf/kylin/blob/f89e35f6/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java index f67fef5..6774c9d 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java @@ -105,6 +105,11 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit value = new BytesWritable(); } + if (watermark >= latestOffset) { + log.info("Reach the end offset, stop reading."); + return false; + } + if (messages == null) { log.info("{} fetching offset {} ", topic + ":" + split.getBrokers() + ":" + partition, watermark); TopicPartition topicPartition = new TopicPartition(topic, partition); @@ -119,10 +124,6 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit if (iterator.hasNext()) { ConsumerRecord<String, String> message = iterator.next(); - if (message.offset() >= latestOffset) { - log.info("Reach the end offset, stop reading."); - return false; - } key.set(message.offset()); byte[] valuebytes = Bytes.toBytes(message.value()); value.set(valuebytes, 0, valuebytes.length);