KYLIN-1919 support embedded json format Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/aa51ce0c Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/aa51ce0c Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/aa51ce0c
Branch: refs/heads/orderedbytes Commit: aa51ce0c3382d330ba5418b49eb669c964315f96 Parents: 792d4ee Author: shaofengshi <shaofeng...@apache.org> Authored: Thu Sep 29 16:04:46 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Thu Oct 6 14:44:05 2016 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/job/DeployUtil.java | 2 +- .../kylin/provision/BuildCubeWithEngine.java | 2 +- .../kylin/provision/BuildCubeWithStream.java | 16 ++-- .../java/org/apache/kylin/rest/DebugTomcat.java | 3 +- .../kylin/source/kafka/AbstractTimeParser.java | 4 +- .../kylin/source/kafka/DateTimeParser.java | 40 ++------- .../kylin/source/kafka/DefaultTimeParser.java | 4 +- .../kylin/source/kafka/StreamingParser.java | 41 ++++++++- .../source/kafka/StringStreamingParser.java | 3 +- .../source/kafka/TimedJsonStreamParser.java | 95 ++++++++++++-------- .../kafka/diagnose/KafkaInputAnalyzer.java | 6 +- 11 files changed, 133 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java index 9e9df05..be9b2a9 100644 --- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java +++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java @@ -156,7 +156,7 @@ public class DeployUtil { for (ColumnDesc columnDesc : tableDesc.getColumns()) { tableColumns.add(columnDesc.getRef()); } - TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, "formatTs=true"); + TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, null); StringBuilder sb = new StringBuilder(); for (String json : data) { List<String> rowColumns = timedJsonStreamParser.parse(ByteBuffer.wrap(json.getBytes())).getData(); http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java index 31cf0eb..971b293 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java @@ -84,7 +84,7 @@ public class BuildCubeWithEngine { afterClass(); logger.info("Going to exit"); System.exit(0); - } catch (Exception e) { + } catch (Throwable e) { logger.error("error", e); System.exit(1); } http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index 58715f1..f8805a6 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -234,6 +234,8 @@ public class BuildCubeWithStream { segments = cubeManager.getCube(cubeName).getSegments(); Assert.assertTrue(segments.size() == 1); } + + logger.info("Build is done"); } @@ -309,20 +311,22 @@ public class BuildCubeWithStream { } public static void main(String[] args) throws Exception { + BuildCubeWithStream buildCubeWithStream = null; try { beforeClass(); - - BuildCubeWithStream buildCubeWithStream = new BuildCubeWithStream(); + buildCubeWithStream = new BuildCubeWithStream(); buildCubeWithStream.before(); buildCubeWithStream.build(); - logger.info("Build is done"); - buildCubeWithStream.after(); - afterClass(); logger.info("Going to exit"); System.exit(0); - } catch (Exception e) { + } catch (Throwable e) { logger.error("error", e); System.exit(1); + } finally { + if (buildCubeWithStream != null) { + buildCubeWithStream.after(); + } + afterClass(); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java b/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java index 7417a05..0f2c500 100644 --- a/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java +++ b/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java @@ -48,7 +48,8 @@ public class DebugTomcat { System.setProperty("catalina.home", "."); if (StringUtils.isEmpty(System.getProperty("hdp.version"))) { - throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); + System.err.println("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); + System.exit(1); } // workaround for job submission from win to linux -- https://issues.apache.org/jira/browse/MAPREDUCE-4052 http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java index 96a4ece..26624ef 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java @@ -18,11 +18,13 @@ package org.apache.kylin.source.kafka; +import java.util.Map; + /** */ public abstract class AbstractTimeParser { - public AbstractTimeParser(String[] properties) { + public AbstractTimeParser(Map<String, String> properties) { } /** http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java index 2bd699d..3382783 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java @@ -18,51 +18,29 @@ package org.apache.kylin.source.kafka; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.FastDateFormat; -import org.apache.kylin.common.util.DateFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.text.ParseException; +import java.util.Map; /** */ public class DateTimeParser extends AbstractTimeParser { private static final Logger logger = LoggerFactory.getLogger(DateTimeParser.class); - private String tsPattern = DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS; + private String tsPattern = null; private FastDateFormat formatter = null; //call by reflection - public DateTimeParser(String[] properties) { + public DateTimeParser(Map<String, String> properties) { super(properties); - for (String prop : properties) { - try { - String[] parts = prop.split("="); - if (parts.length == 2) { - switch (parts[0]) { - case "tsPattern": - this.tsPattern = parts[1]; - break; - default: - break; - } - } - } catch (Exception e) { - logger.error("Failed to parse property " + prop); - //ignore - } - } + tsPattern = properties.get(StreamingParser.PROPERTY_TS_PATTERN); - if (!StringUtils.isEmpty(tsPattern)) { - try { - formatter = org.apache.kylin.common.util.DateFormat.getDateFormat(tsPattern); - } catch (Throwable e) { - throw new IllegalStateException("Invalid tsPattern: '" + tsPattern + "'."); - } - } else { + try { + formatter = org.apache.kylin.common.util.DateFormat.getDateFormat(tsPattern); + } catch (Throwable e) { throw new IllegalStateException("Invalid tsPattern: '" + tsPattern + "'."); } } @@ -77,8 +55,8 @@ public class DateTimeParser extends AbstractTimeParser { try { return formatter.parse(timeStr).getTime(); - } catch (ParseException e) { - throw new IllegalArgumentException("Invalid value : pattern: '" + tsPattern + "', value: '" + timeStr + "'" , e); + } catch (Throwable e) { + throw new IllegalArgumentException("Invalid value: pattern: '" + tsPattern + "', value: '" + timeStr + "'", e); } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java index 85f2bfa..4bcd572 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java @@ -20,11 +20,13 @@ package org.apache.kylin.source.kafka; import org.apache.commons.lang3.StringUtils; +import java.util.Map; + /** */ public class DefaultTimeParser extends AbstractTimeParser { - public DefaultTimeParser(String[] properties) { + public DefaultTimeParser(Map<String, String> properties) { super(properties); } http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java index 4d840b8..43b2ac5 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java @@ -20,8 +20,10 @@ package org.apache.kylin.source.kafka; import java.lang.reflect.Constructor; import java.util.List; +import java.util.Map; import java.util.Set; +import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; import java.nio.ByteBuffer; import org.apache.kylin.common.util.DateFormat; @@ -30,12 +32,21 @@ import org.apache.kylin.common.util.TimeUtil; import org.apache.kylin.metadata.model.TblColRef; import com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * By convention stream parsers should have a constructor with (List<TblColRef> allColumns, String propertiesStr) as params + * By convention stream parsers should have a constructor with (List<TblColRef> allColumns, Map properties) as params */ public abstract class StreamingParser { + private static final Logger logger = LoggerFactory.getLogger(StreamingParser.class); + public static final String PROPERTY_TS_COLUMN_NAME = "tsColName"; + public static final String PROPERTY_TS_PARSER = "tsParser"; + public static final String PROPERTY_TS_PATTERN = "tsPattern"; + public static final String EMBEDDED_PROPERTY_SEPARATOR = "separator"; + + public static final Map<String, String> defaultProperties = Maps.newHashMap(); public static final Set derivedTimeColumns = Sets.newHashSet(); static { derivedTimeColumns.add("minute_start"); @@ -45,6 +56,10 @@ public abstract class StreamingParser { derivedTimeColumns.add("month_start"); derivedTimeColumns.add("quarter_start"); derivedTimeColumns.add("year_start"); + defaultProperties.put(PROPERTY_TS_COLUMN_NAME, "timestamp"); + defaultProperties.put(PROPERTY_TS_PARSER, "org.apache.kylin.source.kafka.DefaultTimeParser"); + defaultProperties.put(PROPERTY_TS_PATTERN, DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS); + defaultProperties.put(EMBEDDED_PROPERTY_SEPARATOR, "_"); } /** @@ -57,14 +72,34 @@ public abstract class StreamingParser { public static StreamingParser getStreamingParser(String parserName, String parserProperties, List<TblColRef> columns) throws ReflectiveOperationException { if (!StringUtils.isEmpty(parserName)) { + logger.info("Construct StreamingParse {} with properties {}", parserName, parserProperties); Class clazz = Class.forName(parserName); - Constructor constructor = clazz.getConstructor(List.class, String.class); - return (StreamingParser) constructor.newInstance(columns, parserProperties); + Map<String, String> properties = parseProperties(parserProperties); + Constructor constructor = clazz.getConstructor(List.class, Map.class); + return (StreamingParser) constructor.newInstance(columns, properties); } else { throw new IllegalStateException("invalid StreamingConfig, parserName " + parserName + ", parserProperties " + parserProperties + "."); } } + public static Map<String, String> parseProperties(String propertiesStr) { + + Map<String, String> result = Maps.newHashMap(defaultProperties); + if (!StringUtils.isEmpty(propertiesStr)) { + String[] properties = propertiesStr.split(";"); + for (String prop : properties) { + String[] parts = prop.split("="); + if (parts.length == 2) { + result.put(parts[0], parts[1]); + } else { + logger.warn("Ignored invalid property expression '" + prop + "'"); + } + } + } + + return result; + } + /** * Calculate the derived time column value and put to the result list. * @param columnName the column name, should be in lower case http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java index cea8e0b..f74df83 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java @@ -21,6 +21,7 @@ package org.apache.kylin.source.kafka; import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; +import java.util.Map; import org.apache.kylin.common.util.StreamingMessage; import org.apache.kylin.metadata.model.TblColRef; @@ -33,7 +34,7 @@ public final class StringStreamingParser extends StreamingParser { public static final StringStreamingParser instance = new StringStreamingParser(null, null); - private StringStreamingParser(List<TblColRef> allColumns, String propertiesStr) { + private StringStreamingParser(List<TblColRef> allColumns, Map<String, String> properties) { } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java index 2125c05..d4327c5 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java @@ -14,7 +14,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.source.kafka; @@ -42,7 +42,17 @@ import com.fasterxml.jackson.databind.type.SimpleType; import com.google.common.collect.Lists; /** - * each json message with a "timestamp" field + * An utility class which parses a JSON streaming message to a list of strings (represent a row in table). + * + * Each message should have a property whose value represents the message's timestamp, default the column name is "timestamp" + * but can be customized by StreamingParser#PROPERTY_TS_PARSER. + * + * By default it will parse the timestamp col value as Unix time. If the format isn't Unix time, need specify the time parser + * with property StreamingParser#PROPERTY_TS_PARSER. + * + * It also support embedded JSON format; Use a separator (customized by StreamingParser#EMBEDDED_PROPERTY_SEPARATOR) to concat + * the property names. + * */ public final class TimedJsonStreamParser extends StreamingParser { @@ -50,51 +60,34 @@ public final class TimedJsonStreamParser extends StreamingParser { private List<TblColRef> allColumns; private final ObjectMapper mapper; - private String tsColName = "timestamp"; - private String tsParser = "org.apache.kylin.source.kafka.DefaultTimeParser"; + private String tsColName = null; + private String tsParser = null; + private String separator = null; + private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(Object.class)); private AbstractTimeParser streamTimeParser; - public TimedJsonStreamParser(List<TblColRef> allColumns, String propertiesStr) { + public TimedJsonStreamParser(List<TblColRef> allColumns, Map<String, String> properties) { this.allColumns = allColumns; - String[] properties = null; - if (!StringUtils.isEmpty(propertiesStr)) { - properties = propertiesStr.split(";"); - for (String prop : properties) { - try { - String[] parts = prop.split("="); - if (parts.length == 2) { - switch (parts[0]) { - case "tsColName": - this.tsColName = parts[1]; - break; - case "tsParser": - this.tsParser = parts[1]; - break; - default: - break; - } - } - } catch (Exception e) { - logger.error("Failed to parse property " + prop); - //ignore - } - } + if (properties == null) { + properties = StreamingParser.defaultProperties; } - logger.info("TimedJsonStreamParser with tsColName {}", tsColName); + tsColName = properties.get(PROPERTY_TS_COLUMN_NAME); + tsParser = properties.get(PROPERTY_TS_PARSER); + separator = properties.get(EMBEDDED_PROPERTY_SEPARATOR); if (!StringUtils.isEmpty(tsParser)) { try { Class clazz = Class.forName(tsParser); - Constructor constructor = clazz.getConstructor(String[].class); - streamTimeParser = (AbstractTimeParser) constructor.newInstance((Object)properties); + Constructor constructor = clazz.getConstructor(Map.class); + streamTimeParser = (AbstractTimeParser) constructor.newInstance(properties); } catch (Exception e) { - throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + propertiesStr + ".", e); + throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + properties + ".", e); } } else { - throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + propertiesStr + "."); + throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + properties + "."); } mapper = new ObjectMapper(); mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); @@ -108,7 +101,7 @@ public final class TimedJsonStreamParser extends StreamingParser { Map<String, Object> message = mapper.readValue(new ByteBufferBackedInputStream(buffer), mapType); Map<String, Object> root = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); root.putAll(message); - String tsStr = String.valueOf(root.get(tsColName)); + String tsStr = objToString(root.get(tsColName)); long t = streamTimeParser.parseTime(tsStr); ArrayList<String> result = Lists.newArrayList(); @@ -116,8 +109,7 @@ public final class TimedJsonStreamParser extends StreamingParser { String columnName = column.getName().toLowerCase(); if (populateDerivedTimeColumns(columnName, result, t) == false) { - String x = String.valueOf(root.get(columnName)); - result.add(x); + result.add(getValueByKey(columnName, root)); } } @@ -133,4 +125,35 @@ public final class TimedJsonStreamParser extends StreamingParser { return true; } + protected String getValueByKey(String key, Map<String, Object> root) throws IOException { + if (root.containsKey(key)) { + return objToString(root.get(key)); + } + + if (key.contains(separator)) { + String[] names = key.toLowerCase().split(separator); + Map<String, Object> tempMap = null; + for (int i = 0; i < names.length - 1; i++) { + Object o = root.get(names[i]); + if (o instanceof Map) { + tempMap = (Map<String, Object>) o; + } else { + throw new IOException("Property '" + names[i] + "' is not embedded format"); + } + } + Object finalObject = tempMap.get(names[names.length - 1]); + return objToString(finalObject); + + } + + return StringUtils.EMPTY; + } + + static String objToString(Object value) { + if (value == null) + return StringUtils.EMPTY; + + return String.valueOf(value); + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java index efaa042..b1b4011 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java @@ -21,6 +21,7 @@ package org.apache.kylin.source.kafka.diagnose; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -28,6 +29,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import com.google.common.collect.Maps; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; @@ -289,8 +291,10 @@ public class KafkaInputAnalyzer extends AbstractApplication { String task = optionsHelper.getOptionValue(OPTION_TASK); String tsColName = optionsHelper.getOptionValue(OPTION_TSCOLNAME); + Map<String, String> properties = Maps.newHashMap(); + properties.put(StreamingParser.PROPERTY_TS_COLUMN_NAME, tsColName); kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(streaming); - parser = new TimedJsonStreamParser(Lists.<TblColRef> newArrayList(), "formatTs=true;tsColName=" + tsColName); + parser = new TimedJsonStreamParser(Lists.<TblColRef> newArrayList(), properties); if ("disorder".equalsIgnoreCase(task)) { analyzeDisorder();