KYLIN-2091 Add API to init the start-point (of each parition) for streaming cube
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c92f79ad Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c92f79ad Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c92f79ad Branch: refs/heads/master-hbase1.x Commit: c92f79ad39e562ed32fff1a55eb979f0593ed6e3 Parents: 5429006 Author: shaofengshi <shaofeng...@apache.org> Authored: Mon Oct 17 17:40:57 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Mon Oct 17 17:53:56 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 2 +- .../java/org/apache/kylin/cube/CubeSegment.java | 12 ++++++ .../org/apache/kylin/cube/model/CubeDesc.java | 13 ++++++ .../kylin/measure/topn/TopNMeasureType.java | 2 +- server-base/pom.xml | 7 ++++ .../kylin/rest/controller/CubeController.java | 10 ++--- .../rest/controller/CubeDescController.java | 44 +++++++++++++++++++- .../kylin/source/kafka/job/SeekOffsetStep.java | 8 +++- .../kylin/source/kafka/util/KafkaClient.java | 22 +++++++++- 9 files changed, 108 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/c92f79ad/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 4942081..73ac788 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -804,7 +804,7 @@ abstract public class KylinConfigBase implements Serializable { } public int getMaxBuildingSegments() { - return Integer.parseInt(getOptional("kylin.cube.building.segment.max", "2")); + return Integer.parseInt(getOptional("kylin.cube.building.segment.max", "10")); } public void setMaxBuildingSegments(int maxBuildingSegments) { http://git-wip-us.apache.org/repos/asf/kylin/blob/c92f79ad/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java index fdf1fb0..eb5b389 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java @@ -564,6 +564,12 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable, ISegmen public void setSourcePartitionOffsetEnd(Map<Integer, Long> sourcePartitionOffsetEnd) { this.sourcePartitionOffsetEnd = sourcePartitionOffsetEnd; + long total = 0; + for (Long x : sourcePartitionOffsetEnd.values()) { + total += x; + } + + this.sourceOffsetEnd = total; } public Map<Integer, Long> getSourcePartitionOffsetStart() { @@ -572,5 +578,11 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable, ISegmen public void setSourcePartitionOffsetStart(Map<Integer, Long> sourcePartitionOffsetStart) { this.sourcePartitionOffsetStart = sourcePartitionOffsetStart; + long total = 0; + for (Long x : sourcePartitionOffsetStart.values()) { + total += x; + } + + this.sourceOffsetStart = total; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/c92f79ad/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index 4195451..bf3724a 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -170,6 +170,10 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { private Map<TblColRef, DeriveInfo> extendedColumnToHosts = Maps.newHashMap(); + @JsonProperty("partition_offset_start") + @JsonInclude(JsonInclude.Include.NON_EMPTY) + private Map<Integer, Long> partitionOffsetStart = Maps.newHashMap(); + public boolean isEnableSharding() { //in the future may extend to other storage that is shard-able return storageType != IStorageAware.ID_HBASE && storageType != IStorageAware.ID_HYBRID; @@ -1011,6 +1015,14 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { this.partitionDateEnd = partitionDateEnd; } + public Map<Integer, Long> getPartitionOffsetStart() { + return partitionOffsetStart; + } + + public void setPartitionOffsetStart(Map<Integer, Long> partitionOffsetStart) { + this.partitionOffsetStart = partitionOffsetStart; + } + /** Get columns that have dictionary */ public Set<TblColRef> getAllColumnsHaveDictionary() { Set<TblColRef> result = Sets.newLinkedHashSet(); @@ -1119,6 +1131,7 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { newCubeDesc.setOverrideKylinProps(cubeDesc.getOverrideKylinProps()); newCubeDesc.setConfig((KylinConfigExt) cubeDesc.getConfig()); newCubeDesc.updateRandomUuid(); + newCubeDesc.setPartitionOffsetStart(cubeDesc.getPartitionOffsetStart()); return newCubeDesc; } http://git-wip-us.apache.org/repos/asf/kylin/blob/c92f79ad/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java index fcf6d5e..2f93b77 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java @@ -262,7 +262,7 @@ public class TopNMeasureType extends MeasureType<TopNCounter<ByteArray>> { }; } - if (digest.aggregations.size() == 0 ) { + if (digest.aggregations.size() == 0) { // directly query the UHC column without sorting unmatchedDimensions.removeAll(literalCol); return new CapabilityInfluence() { http://git-wip-us.apache.org/repos/asf/kylin/blob/c92f79ad/server-base/pom.xml ---------------------------------------------------------------------- diff --git a/server-base/pom.xml b/server-base/pom.xml index 1302051..67013e4 100644 --- a/server-base/pom.xml +++ b/server-base/pom.xml @@ -122,6 +122,13 @@ <artifactId>aspectjweaver</artifactId> </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + <version>${kafka.version}</version> + <scope>provided</scope> + </dependency> + <!-- Test & Env --> <dependency> <groupId>org.apache.kylin</groupId> http://git-wip-us.apache.org/repos/asf/kylin/blob/c92f79ad/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java index be242c3..eefc452 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@ -536,22 +536,22 @@ public class CubeController extends BasicController { /** * get cube segment holes * - * @return true + * @return a list of CubeSegment, each representing a hole * @throws IOException */ - @RequestMapping(value = "/{cubeName}/hole", method = { RequestMethod.GET }) + @RequestMapping(value = "/{cubeName}/holes", method = { RequestMethod.GET }) @ResponseBody public List<CubeSegment> getHoles(@PathVariable String cubeName) { return cubeService.getCubeManager().calculateHoles(cubeName); } /** - * get cube segment holes + * fill cube segment holes * - * @return true + * @return a list of JobInstances to fill the holes * @throws IOException */ - @RequestMapping(value = "/{cubeName}/hole", method = { RequestMethod.PUT }) + @RequestMapping(value = "/{cubeName}/holes", method = { RequestMethod.PUT }) @ResponseBody public List<JobInstance> fillHoles(@PathVariable String cubeName) { List<JobInstance> jobs = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/kylin/blob/c92f79ad/server-base/src/main/java/org/apache/kylin/rest/controller/CubeDescController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeDescController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeDescController.java index 4e595cd..b7eaddd 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeDescController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeDescController.java @@ -19,10 +19,14 @@ package org.apache.kylin.rest.controller; import java.io.IOException; +import java.util.Map; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.metadata.model.ISourceAware; +import org.apache.kylin.rest.response.GeneralResponse; import org.apache.kylin.rest.service.CubeService; +import org.apache.kylin.source.kafka.util.KafkaClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.PathVariable; @@ -68,8 +72,8 @@ public class CubeDescController extends BasicController { * Get detail information of the "Cube ID" * return CubeDesc instead of CubeDesc[] * - * @param cubeDescName - * Cube ID + * @param cubeName + * Cube Name * @return * @throws IOException */ @@ -88,6 +92,42 @@ public class CubeDescController extends BasicController { } } + /** + * Initiate the very beginning of a streaming cube. Will seek the latest offests of each partition from streaming + * source (kafka) and record in the cube descriptor; In the first build job, it will use these offests as the start point. + * @param cubeName + * @return + */ + @RequestMapping(value = "/{cubeName}/initStartOffsets", method = { RequestMethod.PUT }) + @ResponseBody + public GeneralResponse initStartOffsets(@PathVariable String cubeName) { + CubeInstance cubeInstance = cubeService.getCubeManager().getCube(cubeName); + + String msg = ""; + if (cubeInstance == null) { + msg = "Cube '" + cubeName + "' not found."; + throw new IllegalArgumentException(msg); + } + if (cubeInstance.getSourceType() != ISourceAware.ID_STREAMING) { + msg = "Cube '" + cubeName + "' is not a Streaming Cube."; + throw new IllegalArgumentException(msg); + } + + final GeneralResponse response = new GeneralResponse(); + try { + final Map<Integer, Long> startOffsets = KafkaClient.getCurrentOffsets(cubeInstance); + CubeDesc desc = cubeInstance.getDescriptor(); + desc.setPartitionOffsetStart(startOffsets); + cubeService.getCubeDescManager().updateCubeDesc(desc); + response.setProperty("result", "success"); + response.setProperty("offsets", startOffsets.toString()); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return response; + } + public void setCubeService(CubeService cubeService) { this.cubeService = cubeService; } http://git-wip-us.apache.org/repos/asf/kylin/blob/c92f79ad/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java index 5751095..2a3dbb5 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java @@ -61,6 +61,12 @@ public class SeekOffsetStep extends AbstractExecutable { return new ExecuteResult(ExecuteResult.State.SUCCEED, "skipped, as the offset is provided."); } + final Map<Integer, Long> cubeDescStart = cube.getDescriptor().getPartitionOffsetStart(); + if (cube.getSegments().size() == 1 && cubeDescStart != null && cubeDescStart.size() > 0) { + logger.info("This is the first segment, and has initiated the start offsets, will use it"); + startOffsets = cubeDescStart; + } + final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(context.getConfig()).getKafkaConfig(cube.getFactTable()); final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig); final String topic = kafakaConfig.getTopic(); @@ -110,8 +116,6 @@ public class SeekOffsetStep extends AbstractExecutable { } if (totalEndOffset > totalStartOffset) { - segment.setSourceOffsetStart(totalStartOffset); - segment.setSourceOffsetEnd(totalEndOffset); segment.setSourcePartitionOffsetStart(startOffsets); segment.setSourcePartitionOffsetEnd(endOffsets); segment.setName(CubeSegment.makeSegmentName(0, 0, totalStartOffset, totalEndOffset)); http://git-wip-us.apache.org/repos/asf/kylin/blob/c92f79ad/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java index 640cc53..685af6a 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java @@ -17,14 +17,19 @@ */ package org.apache.kylin.source.kafka.util; +import com.google.common.collect.Maps; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.source.kafka.KafkaConfigManager; import org.apache.kylin.source.kafka.config.BrokerConfig; import org.apache.kylin.source.kafka.config.KafkaClusterConfig; import org.apache.kylin.source.kafka.config.KafkaConfig; import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.Properties; @@ -44,7 +49,7 @@ public class KafkaClient { return producer; } - private static Properties constructDefaultKafkaProducerProperties(String brokers, Properties properties){ + private static Properties constructDefaultKafkaProducerProperties(String brokers, Properties properties) { Properties props = new Properties(); props.put("bootstrap.servers", brokers); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); @@ -111,5 +116,20 @@ public class KafkaClient { return consumer.position(topicPartition); } + public static Map<Integer, Long> getCurrentOffsets(final CubeInstance cubeInstance) { + final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(cubeInstance.getConfig()).getKafkaConfig(cubeInstance.getFactTable()); + final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig); + final String topic = kafakaConfig.getTopic(); + + Map<Integer, Long> startOffsets = Maps.newHashMap(); + try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) { + final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); + for (PartitionInfo partitionInfo : partitionInfos) { + long latest = KafkaClient.getLatestOffset(consumer, topic, partitionInfo.partition()); + startOffsets.put(partitionInfo.partition(), latest); + } + } + return startOffsets; + } }