minor, add parserTimeStampField to KafkaConfig (#1405)
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b2fc2c22 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b2fc2c22 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b2fc2c22 Branch: refs/heads/KYLIN-2606 Commit: b2fc2c220ba66482447a7631f90363382bcaa422 Parents: 1acd066 Author: æ <cheng.w...@kyligence.io> Authored: Wed Jun 28 11:23:51 2017 +0800 Committer: Billy(Yiming) Liu <liuyiming....@gmail.com> Committed: Wed Jun 28 11:23:51 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/source/kafka/KafkaMRInput.java | 2 +- .../kylin/source/kafka/config/KafkaConfig.java | 22 ++++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b2fc2c22/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java index 3323afb..5bce4e7 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java @@ -103,7 +103,7 @@ public class KafkaMRInput implements IMRInput { this.cubeSegment = cubeSegment; this.conf = conf; try { - streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns); + streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getAllParserProperties(), columns); } catch (ReflectiveOperationException e) { throw new IllegalArgumentException(e); } http://git-wip-us.apache.org/repos/asf/kylin/blob/b2fc2c22/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java index a096344..547e738 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java @@ -58,6 +58,9 @@ public class KafkaConfig extends RootPersistentEntity { @JsonProperty("parserName") private String parserName; + @JsonProperty("parserTimeStampField") + private String parserTimeStampField; + @Deprecated @JsonProperty("margin") private long margin; @@ -120,6 +123,14 @@ public class KafkaConfig extends RootPersistentEntity { this.margin = margin; } + public void setParserTimeStampField(String parserTimeStampField) { + this.parserTimeStampField = parserTimeStampField; + } + + public String getParserTimeStampField() { + return this.parserTimeStampField; + } + public String getParserProperties() { return parserProperties; } @@ -128,6 +139,17 @@ public class KafkaConfig extends RootPersistentEntity { this.parserProperties = parserProperties; } + public String getAllParserProperties() { + StringBuilder sb = new StringBuilder(); + if (parserProperties != null) + sb.append(parserProperties); + if (parserTimeStampField != null) { + sb.append(";"); + sb.append(parserTimeStampField); + } + return sb.toString(); + } + @Override public KafkaConfig clone() { try {