This is an automated email from the ASF dual-hosted git repository. magang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit e09d9d7b37a688b5bebd8f93aea41d1eaf3bcfec Author: ning.guo <357027...@qq.com> AuthorDate: Fri May 17 12:38:47 2019 +0800 KYLIN-4001 Allow user-specified time format using real-time for backend --- .../stream/core/source/MessageParserInfo.java | 22 ++++++++ .../stream/source/kafka/AbstractTimeParser.java | 38 +++++++++++++ .../kylin/stream/source/kafka/DateTimeParser.java | 55 +++++++++++++++++++ .../kylin/stream/source/kafka/LongTimeParser.java | 63 ++++++++++++++++++++++ .../stream/source/kafka/TimedJsonStreamParser.java | 24 +++++++-- webapp/app/partials/tables/table_detail.html | 8 +++ 6 files changed, 207 insertions(+), 3 deletions(-) diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java b/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java index 89e36dc..4070ae6 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java @@ -28,6 +28,12 @@ public class MessageParserInfo { @JsonProperty("ts_col_name") private String tsColName; + @JsonProperty("ts_parser") + private String tsParser; + + @JsonProperty("ts_pattern") + private String tsPattern; + @JsonProperty("format_ts") private boolean formatTs; @@ -42,6 +48,22 @@ public class MessageParserInfo { this.tsColName = tsColName; } + public String getTsParser() { + return tsParser; + } + + public void setTsParser(String tsParser) { + this.tsParser = tsParser; + } + + public String getTsPattern() { + return tsPattern; + } + + public void setTsPattern(String tsPattern) { + this.tsPattern = tsPattern; + } + public boolean isFormatTs() { return formatTs; } diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/AbstractTimeParser.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/AbstractTimeParser.java new file mode 100644 index 0000000..74a5e9b --- /dev/null +++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/AbstractTimeParser.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.stream.source.kafka; + +import org.apache.kylin.stream.core.source.MessageParserInfo; + + +/** + * Created by guoning on 2019-04-29. + */ +public abstract class AbstractTimeParser { + + public AbstractTimeParser(MessageParserInfo parserInfo) { + } + + /** + * Parse a string time to a long value (epoch time) + * @param time + * @return + */ + abstract public long parseTime(String time) throws IllegalArgumentException; +} diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/DateTimeParser.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/DateTimeParser.java new file mode 100644 index 0000000..0ae2239 --- /dev/null +++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/DateTimeParser.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.kylin.stream.source.kafka; + +import org.apache.commons.lang3.time.FastDateFormat; +import org.apache.kylin.stream.core.source.MessageParserInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Created by guoning on 2019-04-29. + */ +public class DateTimeParser extends AbstractTimeParser { + + private static final Logger logger = LoggerFactory.getLogger(DateTimeParser.class); + private String tsPattern = null; + private FastDateFormat formatter = null; + + + public DateTimeParser(MessageParserInfo parserInfo) { + super(parserInfo); + tsPattern = parserInfo.getTsPattern(); + try { + formatter = org.apache.kylin.common.util.DateFormat.getDateFormat(tsPattern); + } catch (Throwable e) { + throw new IllegalStateException("Invalid tsPattern: '" + tsPattern + "'."); + } + } + + @Override + public long parseTime(String timeStr) throws IllegalArgumentException { + try { + return formatter.parse(timeStr).getTime(); + } catch (Throwable e) { + throw new IllegalArgumentException("Invalid value: pattern: '" + tsPattern + "', value: '" + timeStr + "'", e); + } + } +} diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/LongTimeParser.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/LongTimeParser.java new file mode 100644 index 0000000..de88847 --- /dev/null +++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/LongTimeParser.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.stream.source.kafka; + +import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.stream.core.source.MessageParserInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Locale; + +/** + * Created by guoning on 2019-04-29. + */ +public class LongTimeParser extends AbstractTimeParser { + + private static final Logger logger = LoggerFactory.getLogger(LongTimeParser.class); + private String tsPattern = null; + + public LongTimeParser(MessageParserInfo parserInfo) { + super(parserInfo); + tsPattern = parserInfo.getTsPattern().toUpperCase(Locale.ENGLISH); + } + + /** + * Parse a string time to a long value (epoch time) + * + * @param time + * @return + */ + public long parseTime(String time) throws IllegalArgumentException { + long t; + if (StringUtils.isEmpty(time)) { + t = 0; + } else { + try { + t = Long.parseLong(time); + } catch (NumberFormatException e) { + throw new IllegalArgumentException(e); + } + } + if ("S".equals(tsPattern)) { + t = t * 1000; + } + return t; + } +} diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java index 594e6a4..32e4111 100644 --- a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java +++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java @@ -14,12 +14,13 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.stream.source.kafka; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.lang.reflect.Constructor; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -62,6 +63,8 @@ public final class TimedJsonStreamParser implements IStreamingMessageParser<Cons private List<TblColRef> allColumns; private boolean formatTs = false;//not used private String tsColName = "timestamp"; + private String tsParser = null; + private AbstractTimeParser streamTimeParser; /** * the path of {"user" : {"name": "kite", "sex":"female"}} @@ -88,6 +91,21 @@ public final class TimedJsonStreamParser implements IStreamingMessageParser<Cons } logger.info("Using parser field mapping by {}", parserInfo.getColumnToSourceFieldMapping()); } + this.tsParser = parserInfo.getTsParser(); + + if (!StringUtils.isEmpty(tsParser)) { + try { + Class clazz = Class.forName(tsParser); + Constructor constructor = clazz.getConstructor(MessageParserInfo.class); + streamTimeParser = (AbstractTimeParser) constructor.newInstance(parserInfo); + } catch (Exception e) { + throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", tsPattern " + parserInfo.getTsPattern() + ".", e); + } + } else { + parserInfo.setTsParser("org.apache.kylin.stream.source.kafka.LongTimeParser"); + parserInfo.setTsPattern("MS"); + streamTimeParser = new LongTimeParser(parserInfo); + } } mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); mapper.disable(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE); @@ -108,7 +126,7 @@ public final class TimedJsonStreamParser implements IStreamingMessageParser<Cons if (StringUtils.isEmpty(tsStr)) { t = 0; } else { - t = Long.valueOf(tsStr); + t = streamTimeParser.parseTime(tsStr); } ArrayList<String> result = Lists.newArrayList(); @@ -133,7 +151,7 @@ public final class TimedJsonStreamParser implements IStreamingMessageParser<Cons } return new StreamingMessage(result, new KafkaPartitionPosition(record.partition(), record.offset()), t, - Collections.<String, Object> emptyMap()); + Collections.<String, Object>emptyMap()); } catch (IOException e) { logger.error("error", e); throw new RuntimeException(e); diff --git a/webapp/app/partials/tables/table_detail.html b/webapp/app/partials/tables/table_detail.html index 85026c9..0cf4ed8 100644 --- a/webapp/app/partials/tables/table_detail.html +++ b/webapp/app/partials/tables/table_detail.html @@ -196,6 +196,14 @@ </th> <td>{{currentStreamingConfig.properties['bootstrap.servers']}}</td> </tr> + <tr> + <th>TSParse</th> + <td>{{currentStreamingConfig.parser_info.ts_parser}}</td> + </tr> + <tr> + <th>TSPattern</th> + <td>{{currentStreamingConfig.parser_info.ts_pattern}}</td> + </tr> </tbody> </table> </div>