minor, add parserTimeStampField to KafkaConfig (#1405)


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b2fc2c22
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b2fc2c22
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b2fc2c22

Branch: refs/heads/KYLIN-2606
Commit: b2fc2c220ba66482447a7631f90363382bcaa422
Parents: 1acd066
Author: 成 <cheng.w...@kyligence.io>
Authored: Wed Jun 28 11:23:51 2017 +0800
Committer: Billy(Yiming) Liu <liuyiming....@gmail.com>
Committed: Wed Jun 28 11:23:51 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/source/kafka/KafkaMRInput.java |  2 +-
 .../kylin/source/kafka/config/KafkaConfig.java  | 22 ++++++++++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/b2fc2c22/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 3323afb..5bce4e7 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -103,7 +103,7 @@ public class KafkaMRInput implements IMRInput {
             this.cubeSegment = cubeSegment;
             this.conf = conf;
             try {
-                streamingParser = 
StreamingParser.getStreamingParser(kafkaConfig.getParserName(), 
kafkaConfig.getParserProperties(), columns);
+                streamingParser = 
StreamingParser.getStreamingParser(kafkaConfig.getParserName(), 
kafkaConfig.getAllParserProperties(), columns);
             } catch (ReflectiveOperationException e) {
                 throw new IllegalArgumentException(e);
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b2fc2c22/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index a096344..547e738 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -58,6 +58,9 @@ public class KafkaConfig extends RootPersistentEntity {
     @JsonProperty("parserName")
     private String parserName;
 
+    @JsonProperty("parserTimeStampField")
+    private String parserTimeStampField;
+
     @Deprecated
     @JsonProperty("margin")
     private long margin;
@@ -120,6 +123,14 @@ public class KafkaConfig extends RootPersistentEntity {
         this.margin = margin;
     }
 
+    public void setParserTimeStampField(String parserTimeStampField) {
+        this.parserTimeStampField = parserTimeStampField;
+    }
+
+    public String getParserTimeStampField() {
+        return this.parserTimeStampField;
+    }
+
     public String getParserProperties() {
         return parserProperties;
     }
@@ -128,6 +139,17 @@ public class KafkaConfig extends RootPersistentEntity {
         this.parserProperties = parserProperties;
     }
 
+    public String getAllParserProperties() {
+        StringBuilder sb = new StringBuilder();
+        if (parserProperties != null)
+            sb.append(parserProperties);
+        if (parserTimeStampField != null) {
+            sb.append(";");
+            sb.append(parserTimeStampField);
+        }
+        return sb.toString();
+    }
+
     @Override
     public KafkaConfig clone() {
         try {

Reply via email to