Repository: kylin Updated Branches: refs/heads/master b32ab335b -> eb3943792
KYLIN-1726 validation check for streaming cube Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/eb394379 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/eb394379 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/eb394379 Branch: refs/heads/master Commit: eb3943792e9730c07da3b56961f8d23b991b75db Parents: b32ab33 Author: shaofengshi <shaofeng...@apache.org> Authored: Sat Oct 22 20:56:02 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Tue Oct 25 11:54:50 2016 +0800 ---------------------------------------------------------------------- .../model/validation/CubeMetadataValidator.java | 3 +- .../validation/rule/StreamingCubeRule.java | 76 ++++++++++++++++++++ .../kylin/rest/controller/CubeController.java | 21 ++++-- .../apache/kylin/source/kafka/KafkaMRInput.java | 12 ++-- 4 files changed, 97 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/eb394379/core-cube/src/main/java/org/apache/kylin/cube/model/validation/CubeMetadataValidator.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/CubeMetadataValidator.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/CubeMetadataValidator.java index c22930a..c2c5f89 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/CubeMetadataValidator.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/CubeMetadataValidator.java @@ -23,6 +23,7 @@ import org.apache.kylin.cube.model.validation.rule.AggregationGroupRule; import org.apache.kylin.cube.model.validation.rule.DictionaryRule; import org.apache.kylin.cube.model.validation.rule.FunctionRule; import org.apache.kylin.cube.model.validation.rule.RowKeyAttrRule; +import org.apache.kylin.cube.model.validation.rule.StreamingCubeRule; /** * For cube metadata validator @@ -32,7 +33,7 @@ import org.apache.kylin.cube.model.validation.rule.RowKeyAttrRule; */ public class CubeMetadataValidator { @SuppressWarnings("unchecked") - private IValidatorRule<CubeDesc>[] rules = new IValidatorRule[] { new FunctionRule(), new AggregationGroupRule(), new RowKeyAttrRule(), new DictionaryRule() }; + private IValidatorRule<CubeDesc>[] rules = new IValidatorRule[] { new FunctionRule(), new AggregationGroupRule(), new RowKeyAttrRule(), new DictionaryRule(), new StreamingCubeRule() }; public ValidateContext validate(CubeDesc cube) { ValidateContext context = new ValidateContext(); http://git-wip-us.apache.org/repos/asf/kylin/blob/eb394379/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java new file mode 100644 index 0000000..1d6b7cc --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.cube.model.validation.rule; + +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.DimensionDesc; +import org.apache.kylin.cube.model.validation.IValidatorRule; +import org.apache.kylin.cube.model.validation.ResultLevel; +import org.apache.kylin.cube.model.validation.ValidateContext; +import org.apache.kylin.metadata.model.ISourceAware; + +import org.apache.kylin.metadata.model.TblColRef; + +/** + * + */ +public class StreamingCubeRule implements IValidatorRule<CubeDesc> { + + /* + * (non-Javadoc) + * + * @see + * org.apache.kylin.metadata.validation.IValidatorRule#validate(java.lang.Object + * , org.apache.kylin.metadata.validation.ValidateContext) + */ + @Override + public void validate(CubeDesc cube, ValidateContext context) { + if (cube.getFactTableDesc().getSourceType() != ISourceAware.ID_STREAMING) { + return; + } + + if (cube.getLookupTableDescs() != null && cube.getLookupTableDescs().size() > 0) { + context.addResult(ResultLevel.ERROR, "Streaming Cube doesn't support star-schema so far; only one fact table is allowed."); + return; + } + + if (cube.getModel().getPartitionDesc() == null || cube.getModel().getPartitionDesc().getPartitionDateColumn() == null) { + context.addResult(ResultLevel.ERROR, "Must define a partition column."); + return; + } + + final TblColRef partitionCol = cube.getModel().getPartitionDesc().getPartitionDateColumnRef(); + boolean found = false; + for (DimensionDesc dimensionDesc : cube.getDimensions()) { + for (TblColRef dimCol : dimensionDesc.getColumnRefs()) { + if (dimCol.equals(partitionCol)) { + found = true; + break; + } + } + } + + if (found == false) { + context.addResult(ResultLevel.ERROR, "Partition column '" + partitionCol + "' isn't in dimension list."); + return; + } + + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/eb394379/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java index dd10123..c70b506 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@ -551,6 +551,7 @@ public class CubeController extends BasicController { @RequestMapping(value = "/{cubeName}/holes", method = { RequestMethod.GET }) @ResponseBody public List<CubeSegment> getHoles(@PathVariable String cubeName) { + checkCubeName(cubeName); return cubeService.getCubeManager().calculateHoles(cubeName); } @@ -563,6 +564,7 @@ public class CubeController extends BasicController { @RequestMapping(value = "/{cubeName}/holes", method = { RequestMethod.PUT }) @ResponseBody public List<JobInstance> fillHoles(@PathVariable String cubeName) { + checkCubeName(cubeName); List<JobInstance> jobs = Lists.newArrayList(); List<CubeSegment> holes = cubeService.getCubeManager().calculateHoles(cubeName); @@ -619,15 +621,10 @@ public class CubeController extends BasicController { @RequestMapping(value = "/{cubeName}/init_start_offsets", method = { RequestMethod.PUT }) @ResponseBody public GeneralResponse initStartOffsets(@PathVariable String cubeName) { + checkCubeName(cubeName); CubeInstance cubeInstance = cubeService.getCubeManager().getCube(cubeName); - - String msg = ""; - if (cubeInstance == null) { - msg = "Cube '" + cubeName + "' not found."; - throw new IllegalArgumentException(msg); - } if (cubeInstance.getSourceType() != ISourceAware.ID_STREAMING) { - msg = "Cube '" + cubeName + "' is not a Streaming Cube."; + String msg = "Cube '" + cubeName + "' is not a Streaming Cube."; throw new IllegalArgumentException(msg); } @@ -670,6 +667,16 @@ public class CubeController extends BasicController { request.setMessage(message); } + private void checkCubeName(String cubeName) { + CubeInstance cubeInstance = cubeService.getCubeManager().getCube(cubeName); + + String msg = ""; + if (cubeInstance == null) { + msg = "Cube '" + cubeName + "' not found."; + throw new IllegalArgumentException(msg); + } + } + public void setCubeService(CubeService cubeService) { this.cubeService = cubeService; } http://git-wip-us.apache.org/repos/asf/kylin/blob/eb394379/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java index 54d8c92..fb2a949 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java @@ -87,6 +87,11 @@ public class KafkaMRInput implements IMRInput { this.columns = columns; this.kafkaConfig = kafkaConfig; this.conf = conf; + try { + streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns); + } catch (ReflectiveOperationException e) { + throw new IllegalArgumentException(e); + } } @Override @@ -105,13 +110,6 @@ public class KafkaMRInput implements IMRInput { @Override public String[] parseMapperInput(Object mapperInput) { - if (streamingParser == null) { - try { - streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns); - } catch (ReflectiveOperationException e) { - throw new IllegalArgumentException(e); - } - } Text text = (Text) mapperInput; ByteBuffer buffer = ByteBuffer.wrap(text.getBytes(), 0, text.getLength()); StreamingMessage streamingMessage = streamingParser.parse(buffer);