Repository: kylin
Updated Branches:
  refs/heads/master b32ab335b -> eb3943792


KYLIN-1726 validation check for streaming cube

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/eb394379
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/eb394379
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/eb394379

Branch: refs/heads/master
Commit: eb3943792e9730c07da3b56961f8d23b991b75db
Parents: b32ab33
Author: shaofengshi <shaofeng...@apache.org>
Authored: Sat Oct 22 20:56:02 2016 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Tue Oct 25 11:54:50 2016 +0800

----------------------------------------------------------------------
 .../model/validation/CubeMetadataValidator.java |  3 +-
 .../validation/rule/StreamingCubeRule.java      | 76 ++++++++++++++++++++
 .../kylin/rest/controller/CubeController.java   | 21 ++++--
 .../apache/kylin/source/kafka/KafkaMRInput.java | 12 ++--
 4 files changed, 97 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/eb394379/core-cube/src/main/java/org/apache/kylin/cube/model/validation/CubeMetadataValidator.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/CubeMetadataValidator.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/CubeMetadataValidator.java
index c22930a..c2c5f89 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/CubeMetadataValidator.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/CubeMetadataValidator.java
@@ -23,6 +23,7 @@ import 
org.apache.kylin.cube.model.validation.rule.AggregationGroupRule;
 import org.apache.kylin.cube.model.validation.rule.DictionaryRule;
 import org.apache.kylin.cube.model.validation.rule.FunctionRule;
 import org.apache.kylin.cube.model.validation.rule.RowKeyAttrRule;
+import org.apache.kylin.cube.model.validation.rule.StreamingCubeRule;
 
 /**
  * For cube metadata validator
@@ -32,7 +33,7 @@ import 
org.apache.kylin.cube.model.validation.rule.RowKeyAttrRule;
  */
 public class CubeMetadataValidator {
     @SuppressWarnings("unchecked")
-    private IValidatorRule<CubeDesc>[] rules = new IValidatorRule[] { new 
FunctionRule(), new AggregationGroupRule(), new RowKeyAttrRule(), new 
DictionaryRule() };
+    private IValidatorRule<CubeDesc>[] rules = new IValidatorRule[] { new 
FunctionRule(), new AggregationGroupRule(), new RowKeyAttrRule(), new 
DictionaryRule(), new StreamingCubeRule() };
 
     public ValidateContext validate(CubeDesc cube) {
         ValidateContext context = new ValidateContext();

http://git-wip-us.apache.org/repos/asf/kylin/blob/eb394379/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java
new file mode 100644
index 0000000..1d6b7cc
--- /dev/null
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.model.validation.rule;
+
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.cube.model.validation.IValidatorRule;
+import org.apache.kylin.cube.model.validation.ResultLevel;
+import org.apache.kylin.cube.model.validation.ValidateContext;
+import org.apache.kylin.metadata.model.ISourceAware;
+
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ *
+ */
+public class StreamingCubeRule implements IValidatorRule<CubeDesc> {
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * 
org.apache.kylin.metadata.validation.IValidatorRule#validate(java.lang.Object
+     * , org.apache.kylin.metadata.validation.ValidateContext)
+     */
+    @Override
+    public void validate(CubeDesc cube, ValidateContext context) {
+        if (cube.getFactTableDesc().getSourceType() != 
ISourceAware.ID_STREAMING) {
+            return;
+        }
+
+        if (cube.getLookupTableDescs() != null && 
cube.getLookupTableDescs().size() > 0) {
+            context.addResult(ResultLevel.ERROR, "Streaming Cube doesn't 
support star-schema so far; only one fact table is allowed.");
+            return;
+        }
+
+        if (cube.getModel().getPartitionDesc() == null || 
cube.getModel().getPartitionDesc().getPartitionDateColumn() == null) {
+            context.addResult(ResultLevel.ERROR, "Must define a partition 
column.");
+            return;
+        }
+
+        final TblColRef partitionCol = 
cube.getModel().getPartitionDesc().getPartitionDateColumnRef();
+        boolean found = false;
+        for (DimensionDesc dimensionDesc : cube.getDimensions()) {
+            for (TblColRef dimCol : dimensionDesc.getColumnRefs()) {
+                if (dimCol.equals(partitionCol)) {
+                    found = true;
+                    break;
+                }
+            }
+        }
+
+        if (found == false) {
+            context.addResult(ResultLevel.ERROR, "Partition column '" + 
partitionCol + "' isn't in dimension list.");
+            return;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/eb394379/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index dd10123..c70b506 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -551,6 +551,7 @@ public class CubeController extends BasicController {
     @RequestMapping(value = "/{cubeName}/holes", method = { RequestMethod.GET 
})
     @ResponseBody
     public List<CubeSegment> getHoles(@PathVariable String cubeName) {
+        checkCubeName(cubeName);
         return cubeService.getCubeManager().calculateHoles(cubeName);
     }
 
@@ -563,6 +564,7 @@ public class CubeController extends BasicController {
     @RequestMapping(value = "/{cubeName}/holes", method = { RequestMethod.PUT 
})
     @ResponseBody
     public List<JobInstance> fillHoles(@PathVariable String cubeName) {
+        checkCubeName(cubeName);
         List<JobInstance> jobs = Lists.newArrayList();
         List<CubeSegment> holes = 
cubeService.getCubeManager().calculateHoles(cubeName);
 
@@ -619,15 +621,10 @@ public class CubeController extends BasicController {
     @RequestMapping(value = "/{cubeName}/init_start_offsets", method = { 
RequestMethod.PUT })
     @ResponseBody
     public GeneralResponse initStartOffsets(@PathVariable String cubeName) {
+        checkCubeName(cubeName);
         CubeInstance cubeInstance = 
cubeService.getCubeManager().getCube(cubeName);
-
-        String msg = "";
-        if (cubeInstance == null) {
-            msg = "Cube '" + cubeName + "' not found.";
-            throw new IllegalArgumentException(msg);
-        }
         if (cubeInstance.getSourceType() != ISourceAware.ID_STREAMING) {
-            msg = "Cube '" + cubeName + "' is not a Streaming Cube.";
+            String msg = "Cube '" + cubeName + "' is not a Streaming Cube.";
             throw new IllegalArgumentException(msg);
         }
 
@@ -670,6 +667,16 @@ public class CubeController extends BasicController {
         request.setMessage(message);
     }
 
+    private void checkCubeName(String cubeName) {
+        CubeInstance cubeInstance = 
cubeService.getCubeManager().getCube(cubeName);
+
+        String msg = "";
+        if (cubeInstance == null) {
+            msg = "Cube '" + cubeName + "' not found.";
+            throw new IllegalArgumentException(msg);
+        }
+    }
+
     public void setCubeService(CubeService cubeService) {
         this.cubeService = cubeService;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/eb394379/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 54d8c92..fb2a949 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -87,6 +87,11 @@ public class KafkaMRInput implements IMRInput {
             this.columns = columns;
             this.kafkaConfig = kafkaConfig;
             this.conf = conf;
+            try {
+                streamingParser = 
StreamingParser.getStreamingParser(kafkaConfig.getParserName(), 
kafkaConfig.getParserProperties(), columns);
+            } catch (ReflectiveOperationException e) {
+                throw new IllegalArgumentException(e);
+            }
         }
 
         @Override
@@ -105,13 +110,6 @@ public class KafkaMRInput implements IMRInput {
 
         @Override
         public String[] parseMapperInput(Object mapperInput) {
-            if (streamingParser == null) {
-                try {
-                    streamingParser = 
StreamingParser.getStreamingParser(kafkaConfig.getParserName(), 
kafkaConfig.getParserProperties(), columns);
-                } catch (ReflectiveOperationException e) {
-                    throw new IllegalArgumentException(e);
-                }
-            }
             Text text = (Text) mapperInput;
             ByteBuffer buffer = ByteBuffer.wrap(text.getBytes(), 0, 
text.getLength());
             StreamingMessage streamingMessage = streamingParser.parse(buffer);

Reply via email to