Repository: kylin Updated Branches: refs/heads/master 1004a8532 -> 34596d355
KYLIN-2775 fix wrong tsColName parsing Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7f47b51a Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7f47b51a Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7f47b51a Branch: refs/heads/master Commit: 7f47b51a3ebe0ccd0283fa79995bfb02f4c3919e Parents: 363f858 Author: Billy Liu <billy...@apache.org> Authored: Fri Sep 1 13:49:12 2017 +0800 Committer: æ <cheng.w...@kyligence.io> Committed: Fri Sep 1 14:43:21 2017 +0800 ---------------------------------------------------------------------- .../template/kafka/DEFAULT.KYLIN_STREAMING_TABLE.json | 2 +- .../java/org/apache/kylin/source/kafka/StreamingParser.java | 2 +- .../org/apache/kylin/source/kafka/config/KafkaConfig.java | 7 ++++--- 3 files changed, 6 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/7f47b51a/examples/sample_cube/template/kafka/DEFAULT.KYLIN_STREAMING_TABLE.json ---------------------------------------------------------------------- diff --git a/examples/sample_cube/template/kafka/DEFAULT.KYLIN_STREAMING_TABLE.json b/examples/sample_cube/template/kafka/DEFAULT.KYLIN_STREAMING_TABLE.json index 6c02b27..02bf0d0 100644 --- a/examples/sample_cube/template/kafka/DEFAULT.KYLIN_STREAMING_TABLE.json +++ b/examples/sample_cube/template/kafka/DEFAULT.KYLIN_STREAMING_TABLE.json @@ -6,7 +6,7 @@ "timeout": 60000, "bufferSize": 65536, "parserName": "org.apache.kylin.source.kafka.TimedJsonStreamParser", - "parserProperties": "tsColName=order_time", + "parserTimeStampField": "order_time", "last_modified": 0, "clusters": [ { http://git-wip-us.apache.org/repos/asf/kylin/blob/7f47b51a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java index 2e3c11c..c2b5104 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java @@ -76,7 +76,7 @@ public abstract class StreamingParser { Constructor constructor = clazz.getConstructor(List.class, Map.class); return (StreamingParser) constructor.newInstance(columns, properties); } else { - throw new IllegalStateException("invalid StreamingConfig, parserName " + parserName + ", parserProperties " + parserProperties + "."); + throw new IllegalStateException("Invalid StreamingConfig, parserName " + parserName + ", parserProperties " + parserProperties + "."); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/7f47b51a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java index 547e738..ce1d706 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java @@ -34,6 +34,7 @@ import org.apache.kylin.metadata.MetadataConstants; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonManagedReference; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kylin.source.kafka.TimedJsonStreamParser; /** */ @@ -141,12 +142,12 @@ public class KafkaConfig extends RootPersistentEntity { public String getAllParserProperties() { StringBuilder sb = new StringBuilder(); - if (parserProperties != null) - sb.append(parserProperties); if (parserTimeStampField != null) { + sb.append(TimedJsonStreamParser.PROPERTY_TS_COLUMN_NAME + "=" + parserTimeStampField); sb.append(";"); - sb.append(parserTimeStampField); } + if (parserProperties != null) + sb.append(parserProperties); return sb.toString(); }