KYLIN-1726 update to kafka 0.10
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1b1b2e37 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1b1b2e37 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1b1b2e37 Branch: refs/heads/KYLIN-1726 Commit: 1b1b2e37fdcba7ad67f0fa3f2369aa65431f13bc Parents: 4e060e7 Author: shaofengshi <shaofeng...@apache.org> Authored: Fri Sep 2 19:25:57 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Wed Sep 14 16:34:36 2016 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/cube/CubeManager.java | 30 ++++++++++++---- .../kylin/rest/controller/CubeController.java | 8 ++--- .../apache/kylin/rest/service/JobService.java | 4 +-- .../source/kafka/util/KafkaSampleProducer.java | 38 ++++++++++++-------- 4 files changed, 53 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/1b1b2e37/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index fc68798..11eabce 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -679,12 +679,28 @@ public class CubeManager implements IRealizationProvider { return null; } - if (cube.getBuildingSegments().size() > 0) { - logger.debug("Cube " + cube.getName() + " has bulding segment, will not trigger merge at this moment"); - return null; + List<CubeSegment> buildingSegs = cube.getBuildingSegments(); + if (buildingSegs.size() > 0) { + logger.debug("Cube " + cube.getName() + " has " + buildingSegs.size() + " building segments"); + } + + List<CubeSegment> readySegs = cube.getSegments(SegmentStatusEnum.READY); + + List<CubeSegment> mergingSegs = Lists.newArrayList(); + if (buildingSegs.size() > 0) { + + for (CubeSegment building : buildingSegs) { + // exclude those under-merging segs + for (CubeSegment ready : readySegs) { + if (ready.getSourceOffsetStart() >= building.getSourceOffsetStart() && ready.getSourceOffsetEnd() <= building.getSourceOffsetEnd()) { + mergingSegs.add(ready); + } + } + } } - List<CubeSegment> ready = cube.getSegments(SegmentStatusEnum.READY); + // exclude those already under merging segments + readySegs.removeAll(mergingSegs); long[] timeRanges = cube.getDescriptor().getAutoMergeTimeRanges(); Arrays.sort(timeRanges); @@ -692,9 +708,9 @@ public class CubeManager implements IRealizationProvider { for (int i = timeRanges.length - 1; i >= 0; i--) { long toMergeRange = timeRanges[i]; - for (int s = 0; s < ready.size(); s++) { - CubeSegment seg = ready.get(s); - Pair<CubeSegment, CubeSegment> p = findMergeOffsetsByDateRange(ready.subList(s, ready.size()), // + for (int s = 0; s < readySegs.size(); s++) { + CubeSegment seg = readySegs.get(s); + Pair<CubeSegment, CubeSegment> p = findMergeOffsetsByDateRange(readySegs.subList(s, readySegs.size()), // seg.getDateRangeStart(), seg.getDateRangeStart() + toMergeRange, toMergeRange); if (p != null && p.getSecond().getDateRangeEnd() - p.getFirst().getDateRangeStart() >= toMergeRange) return Pair.newPair(p.getFirst().getSourceOffsetStart(), p.getSecond().getSourceOffsetEnd()); http://git-wip-us.apache.org/repos/asf/kylin/blob/1b1b2e37/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java index 42b117c..669f53e 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@ -272,7 +272,7 @@ public class CubeController extends BasicController { @RequestMapping(value = "/{cubeName}/rebuild", method = { RequestMethod.PUT }) @ResponseBody public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest req) { - return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 0, 0, req.getBuildType(), req.isForce() || req.isForceMergeEmptySegment()); + return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 0, 0, req.getBuildType(), true, req.isForce() || req.isForceMergeEmptySegment()); } /** Build/Rebuild a cube segment by source offset */ @@ -286,16 +286,16 @@ public class CubeController extends BasicController { @RequestMapping(value = "/{cubeName}/rebuild2", method = { RequestMethod.PUT }) @ResponseBody public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest2 req) { - return buildInternal(cubeName, 0, 0, req.getStartSourceOffset(), req.getEndSourceOffset(), req.getBuildType(), req.isForce()); + return buildInternal(cubeName, 0, 0, req.getStartSourceOffset(), req.getEndSourceOffset(), req.getBuildType(), false, req.isForce()); } private JobInstance buildInternal(String cubeName, long startTime, long endTime, // - long startOffset, long endOffset, String buildType, boolean force) { + long startOffset, long endOffset, String buildType, boolean strictCheck, boolean force) { try { String submitter = SecurityContextHolder.getContext().getAuthentication().getName(); CubeInstance cube = jobService.getCubeManager().getCube(cubeName); return jobService.submitJob(cube, startTime, endTime, startOffset, endOffset, // - CubeBuildTypeEnum.valueOf(buildType), force, submitter); + CubeBuildTypeEnum.valueOf(buildType), strictCheck, force, submitter); } catch (Exception e) { logger.error(e.getLocalizedMessage(), e); throw new InternalErrorException(e.getLocalizedMessage()); http://git-wip-us.apache.org/repos/asf/kylin/blob/1b1b2e37/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index 5c704ba..8929bf1 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -199,7 +199,7 @@ public class JobService extends BasicService { @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')") public JobInstance submitJob(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, // - CubeBuildTypeEnum buildType, boolean force, String submitter) throws IOException, JobException { + CubeBuildTypeEnum buildType, boolean strictCheck, boolean force, String submitter) throws IOException, JobException { if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) { throw new BadRequestException("Broken cube " + cube.getName() + " can't be built"); @@ -211,7 +211,7 @@ public class JobService extends BasicService { DefaultChainedExecutable job; if (buildType == CubeBuildTypeEnum.BUILD) { - CubeSegment newSeg = getCubeManager().appendSegment(cube, startDate, endDate, startOffset, endOffset); + CubeSegment newSeg = getCubeManager().appendSegment(cube, startDate, endDate, startOffset, endOffset, strictCheck); job = EngineFactory.createBatchCubingJob(newSeg, submitter); } else if (buildType == CubeBuildTypeEnum.MERGE) { CubeSegment newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, startOffset, endOffset, force); http://git-wip-us.apache.org/repos/asf/kylin/blob/1b1b2e37/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java index 2a86a98..3d26d3d 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java @@ -30,16 +30,15 @@ import java.util.Random; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kylin.common.util.OptionsHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; - /** * A sample producer which will create sample data to kafka topic */ @@ -49,7 +48,8 @@ public class KafkaSampleProducer { @SuppressWarnings("static-access") private static final Option OPTION_TOPIC = OptionBuilder.withArgName("topic").hasArg().isRequired(true).withDescription("Kafka topic").create("topic"); private static final Option OPTION_BROKER = OptionBuilder.withArgName("broker").hasArg().isRequired(true).withDescription("Kafka broker").create("broker"); - private static final Option OPTION_DELAY = OptionBuilder.withArgName("delay").hasArg().isRequired(false).withDescription("Simulated message delay").create("delay"); + private static final Option OPTION_DELAY = OptionBuilder.withArgName("delay").hasArg().isRequired(false).withDescription("Simulated message delay in mili-seconds, default 0").create("delay"); + private static final Option OPTION_INTERVAL = OptionBuilder.withArgName("interval").hasArg().isRequired(false).withDescription("Simulated message interval in mili-seconds, default 1000").create("interval"); private static final ObjectMapper mapper = new ObjectMapper(); @@ -61,6 +61,7 @@ public class KafkaSampleProducer { options.addOption(OPTION_TOPIC); options.addOption(OPTION_BROKER); options.addOption(OPTION_DELAY); + options.addOption(OPTION_INTERVAL); optionsHelper.parseOptions(options, args); logger.info("options: '" + optionsHelper.getOptionsAsString() + "'"); @@ -70,7 +71,13 @@ public class KafkaSampleProducer { long delay = 0; String delayString = optionsHelper.getOptionValue(OPTION_DELAY); if (delayString != null) { - delay = Long.parseLong(optionsHelper.getOptionValue(OPTION_DELAY)); + delay = Long.parseLong(delayString); + } + + long interval = 1000; + String intervalString = optionsHelper.getOptionValue(OPTION_INTERVAL); + if (intervalString != null) { + interval = Long.parseLong(intervalString); } List<String> countries = new ArrayList(); @@ -95,13 +102,16 @@ public class KafkaSampleProducer { devices.add("Other"); Properties props = new Properties(); - props.put("metadata.broker.list", broker); - props.put("serializer.class", "kafka.serializer.StringEncoder"); - props.put("request.required.acks", "1"); - - ProducerConfig config = new ProducerConfig(props); + props.put("bootstrap.servers", broker); + props.put("acks", "all"); + props.put("retries", 0); + props.put("batch.size", 16384); + props.put("linger.ms", 1); + props.put("buffer.memory", 33554432); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - Producer<String, String> producer = new Producer<String, String>(config); + Producer<String, String> producer = new KafkaProducer<>(props); boolean alive = true; Random rnd = new Random(); @@ -114,10 +124,10 @@ public class KafkaSampleProducer { record.put("qty", rnd.nextInt(10)); record.put("currency", "USD"); record.put("amount", rnd.nextDouble() * 100); - KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record)); + ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record)); System.out.println("Sending 1 message"); producer.send(data); - Thread.sleep(2000); + Thread.sleep(interval); } producer.close(); }