KYLIN-2596 Enable generating multiple streaming messages with one input message in streaming parser
* Minor, remove useless imports. * Enable generating multiple streaming messages with one input message in streaming parser * Make MR input can generate multiple rows of date. * For multiple rows, outputKV() should be called for each row. * Try&catch for each row's data processing. Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/edc4d4cc Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/edc4d4cc Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/edc4d4cc Branch: refs/heads/KYLIN-2624 Commit: edc4d4cc558473476b18d48f232635e44640c27a Parents: 7c0038d Author: nichunen <zjsy...@sjtu.org> Authored: Fri May 12 14:55:14 2017 +0800 Committer: hongbin ma <m...@kyligence.io> Committed: Fri May 12 14:55:14 2017 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/job/DeployUtil.java | 2 +- .../kylin/common/util/StreamingMessage.java | 62 ---------- .../kylin/common/util/StreamingMessageRow.java | 62 ++++++++++ .../org/apache/kylin/engine/mr/IMRInput.java | 4 +- .../mr/steps/FactDistinctColumnsMapper.java | 113 ++++++++++--------- .../engine/mr/steps/HiveToBaseCuboidMapper.java | 15 ++- .../engine/mr/steps/InMemCuboidMapper.java | 15 ++- .../apache/kylin/source/hive/HiveMRInput.java | 6 +- .../cardinality/ColumnCardinalityMapper.java | 25 ++-- .../apache/kylin/source/kafka/KafkaMRInput.java | 16 ++- .../kylin/source/kafka/StreamingParser.java | 8 +- .../source/kafka/TimedJsonStreamParser.java | 11 +- .../source/kafka/TimedJsonStreamParserTest.java | 22 ++-- 13 files changed, 194 insertions(+), 167 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/edc4d4cc/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java index fdcd52c..077c056 100644 --- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java +++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java @@ -166,7 +166,7 @@ public class DeployUtil { TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, null); StringBuilder sb = new StringBuilder(); for (String json : data) { - List<String> rowColumns = timedJsonStreamParser.parse(ByteBuffer.wrap(json.getBytes())).getData(); + List<String> rowColumns = timedJsonStreamParser.parse(ByteBuffer.wrap(json.getBytes())).get(0).getData(); sb.append(StringUtils.join(rowColumns, ",")); sb.append(System.getProperty("line.separator")); } http://git-wip-us.apache.org/repos/asf/kylin/blob/edc4d4cc/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java deleted file mode 100644 index 981c8a8..0000000 --- a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.common.util; - -import java.util.List; -import java.util.Map; - -/** - */ -public class StreamingMessage { - - private final List<String> data; - - private long offset; - - private long timestamp; - - private Map<String, Object> params; - - public StreamingMessage(List<String> data, long offset, long timestamp, Map<String, Object> params) { - this.data = data; - this.offset = offset; - this.timestamp = timestamp; - this.params = params; - } - - public final List<String> getData() { - return data; - } - - public final long getOffset() { - return offset; - } - - public void setOffset(long offset) { - this.offset = offset; - } - - public final long getTimestamp() { - return timestamp; - } - - public Map<String, Object> getParams() { - return params; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/edc4d4cc/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessageRow.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessageRow.java b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessageRow.java new file mode 100644 index 0000000..9b287d4 --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessageRow.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.common.util; + +import java.util.List; +import java.util.Map; + +/** + */ +public class StreamingMessageRow { + + private final List<String> data; + + private long offset; + + private long timestamp; + + private Map<String, Object> params; + + public StreamingMessageRow(List<String> data, long offset, long timestamp, Map<String, Object> params) { + this.data = data; + this.offset = offset; + this.timestamp = timestamp; + this.params = params; + } + + public final List<String> getData() { + return data; + } + + public final long getOffset() { + return offset; + } + + public void setOffset(long offset) { + this.offset = offset; + } + + public final long getTimestamp() { + return timestamp; + } + + public Map<String, Object> getParams() { + return params; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/edc4d4cc/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java index 10d4879..6b0e557 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java @@ -24,6 +24,8 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.metadata.model.TableDesc; +import java.util.Collection; + /** * Any ITableSource that wishes to serve as input of MapReduce build engine must adapt to this interface. */ @@ -50,7 +52,7 @@ public interface IMRInput { public void configureJob(Job job); /** Parse a mapper input object into column values. */ - public String[] parseMapperInput(Object mapperInput); + public Collection<String[]> parseMapperInput(Object mapperInput); } /** http://git-wip-us.apache.org/repos/asf/kylin/blob/edc4d4cc/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java index d36ae18..713b7f7 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java @@ -44,6 +44,7 @@ import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; + /** */ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, Object> { @@ -157,70 +158,72 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB @Override public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException { - String[] row = flatTableInputFormat.parseMapperInput(record); - - context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row)); - for (int i = 0; i < factDictCols.size(); i++) { - String fieldValue = row[dictionaryColumnIndex[i]]; - if (fieldValue == null) - continue; - - int reducerIndex; - if (uhcIndex[i] == 0) { - //for the normal dictionary column - reducerIndex = columnIndexToReducerBeginId.get(i); - } else { - //for the uhc - reducerIndex = columnIndexToReducerBeginId.get(i) + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount; - } + Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(record); + + for (String[] row: rowCollection) { + context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row)); + for (int i = 0; i < factDictCols.size(); i++) { + String fieldValue = row[dictionaryColumnIndex[i]]; + if (fieldValue == null) + continue; + + int reducerIndex; + if (uhcIndex[i] == 0) { + //for the normal dictionary column + reducerIndex = columnIndexToReducerBeginId.get(i); + } else { + //for the uhc + reducerIndex = columnIndexToReducerBeginId.get(i) + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount; + } - tmpbuf.clear(); - byte[] valueBytes = Bytes.toBytes(fieldValue); - int size = valueBytes.length + 1; - if (size >= tmpbuf.capacity()) { - tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size)); - } - tmpbuf.put(Bytes.toBytes(reducerIndex)[3]); - tmpbuf.put(valueBytes); - outputKey.set(tmpbuf.array(), 0, tmpbuf.position()); - DataType type = factDictCols.get(i).getType(); - sortableKey.init(outputKey, type); - //judge type - context.write(sortableKey, EMPTY_TEXT); - - // log a few rows for troubleshooting - if (rowCount < 10) { - logger.info("Sample output: " + factDictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex); + tmpbuf.clear(); + byte[] valueBytes = Bytes.toBytes(fieldValue); + int size = valueBytes.length + 1; + if (size >= tmpbuf.capacity()) { + tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size)); + } + tmpbuf.put(Bytes.toBytes(reducerIndex)[3]); + tmpbuf.put(valueBytes); + outputKey.set(tmpbuf.array(), 0, tmpbuf.position()); + DataType type = factDictCols.get(i).getType(); + sortableKey.init(outputKey, type); + //judge type + context.write(sortableKey, EMPTY_TEXT); + + // log a few rows for troubleshooting + if (rowCount < 10) { + logger.info("Sample output: " + factDictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex); + } } - } - if (collectStatistics) { - if (rowCount % 100 < samplingPercentage) { - if (isUsePutRowKeyToHllNewAlgorithm) { - putRowKeyToHLLNew(row); - } else { - putRowKeyToHLLOld(row); + if (collectStatistics) { + if (rowCount % 100 < samplingPercentage) { + if (isUsePutRowKeyToHllNewAlgorithm) { + putRowKeyToHLLNew(row); + } else { + putRowKeyToHLLOld(row); + } } - } - if (needFetchPartitionCol == true) { - String fieldValue = row[partitionColumnIndex]; - if (fieldValue != null) { - tmpbuf.clear(); - byte[] valueBytes = Bytes.toBytes(fieldValue); - int size = valueBytes.length + 1; - if (size >= tmpbuf.capacity()) { - tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size)); + if (needFetchPartitionCol == true) { + String fieldValue = row[partitionColumnIndex]; + if (fieldValue != null) { + tmpbuf.clear(); + byte[] valueBytes = Bytes.toBytes(fieldValue); + int size = valueBytes.length + 1; + if (size >= tmpbuf.capacity()) { + tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size)); + } + tmpbuf.put(MARK_FOR_PARTITION_COL); + tmpbuf.put(valueBytes); + outputKey.set(tmpbuf.array(), 0, tmpbuf.position()); + sortableKey.init(outputKey, (byte) 0); + context.write(sortableKey, EMPTY_TEXT); } - tmpbuf.put(MARK_FOR_PARTITION_COL); - tmpbuf.put(valueBytes); - outputKey.set(tmpbuf.array(), 0, tmpbuf.position()); - sortableKey.init(outputKey, (byte) 0); - context.write(sortableKey, EMPTY_TEXT); } } + rowCount++; } - rowCount++; } private long countSizeInBytes(String[] row) { http://git-wip-us.apache.org/repos/asf/kylin/blob/edc4d4cc/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java index 428f878..a04fb43 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java @@ -19,6 +19,7 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; +import java.util.Collection; import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat; import org.apache.kylin.engine.mr.MRUtil; @@ -38,12 +39,14 @@ public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, O @Override public void doMap(KEYIN key, Object value, Context context) throws IOException, InterruptedException { - String[] row = flatTableInputFormat.parseMapperInput(value); - try { - outputKV(row, context); - - } catch (Exception ex) { - handleErrorRecord(row, ex); + Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(value); + for (String[] row: rowCollection) { + try { + outputKV(row, context); + + } catch (Exception ex) { + handleErrorRecord(row, ex); + } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/edc4d4cc/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java index c0ff2f2..eee189c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java @@ -20,6 +20,7 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -116,12 +117,14 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr @Override public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException { // put each row to the queue - String[] row = flatTableInputFormat.parseMapperInput(record); - List<String> rowAsList = Arrays.asList(row); - - while (!future.isDone()) { - if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) { - break; + Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(record); + + for(String[] row: rowCollection) { + List<String> rowAsList = Arrays.asList(row); + while (!future.isDone()) { + if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) { + break; + } } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/edc4d4cc/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 2f348a0..d7a2c7e 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -19,6 +19,8 @@ package org.apache.kylin.source.hive; import java.io.IOException; +import java.util.Collections; +import java.util.List; import java.util.Set; import org.apache.commons.lang.StringUtils; @@ -118,8 +120,8 @@ public class HiveMRInput implements IMRInput { } @Override - public String[] parseMapperInput(Object mapperInput) { - return HiveTableReader.getRowAsStringArray((HCatRecord) mapperInput); + public List<String[]> parseMapperInput(Object mapperInput) { + return Collections.singletonList(HiveTableReader.getRowAsStringArray((HCatRecord) mapperInput)); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/edc4d4cc/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java index c712605..9033d67 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java @@ -20,6 +20,7 @@ package org.apache.kylin.source.hive.cardinality; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -68,22 +69,24 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritab @Override public void doMap(T key, Object value, Context context) throws IOException, InterruptedException { ColumnDesc[] columns = tableDesc.getColumns(); - String[] values = tableInputFormat.parseMapperInput(value); + Collection<String[]> valuesCollection = tableInputFormat.parseMapperInput(value); - for (int m = 0; m < columns.length; m++) { - String field = columns[m].getName(); - String fieldValue = values[m]; - if (fieldValue == null) - fieldValue = "NULL"; + for (String[] values: valuesCollection) { + for (int m = 0; m < columns.length; m++) { + String field = columns[m].getName(); + String fieldValue = values[m]; + if (fieldValue == null) + fieldValue = "NULL"; - if (counter < 5 && m < 10) { - System.out.println("Get row " + counter + " column '" + field + "' value: " + fieldValue); + if (counter < 5 && m < 10) { + System.out.println("Get row " + counter + " column '" + field + "' value: " + fieldValue); + } + + getHllc(m).add(Bytes.toBytes(fieldValue.toString())); } - getHllc(m).add(Bytes.toBytes(fieldValue.toString())); + counter++; } - - counter++; } private HLLCounter getHllc(Integer key) { http://git-wip-us.apache.org/repos/asf/kylin/blob/edc4d4cc/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java index c7b327f..500e1e9 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java @@ -19,7 +19,9 @@ package org.apache.kylin.source.kafka; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; import javax.annotation.Nullable; @@ -32,7 +34,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.common.util.StreamingMessage; +import org.apache.kylin.common.util.StreamingMessageRow; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; import org.apache.kylin.engine.mr.IMRInput; @@ -127,11 +129,17 @@ public class KafkaMRInput implements IMRInput { } @Override - public String[] parseMapperInput(Object mapperInput) { + public Collection<String[]> parseMapperInput(Object mapperInput) { Text text = (Text) mapperInput; ByteBuffer buffer = ByteBuffer.wrap(text.getBytes(), 0, text.getLength()); - StreamingMessage streamingMessage = streamingParser.parse(buffer); - return streamingMessage.getData().toArray(new String[streamingMessage.getData().size()]); + List<StreamingMessageRow> streamingMessageRowList = streamingParser.parse(buffer); + List<String[]> parsedDataCollection = new ArrayList<>(); + + for (StreamingMessageRow row: streamingMessageRowList) { + parsedDataCollection.add(row.getData().toArray(new String[row.getData().size()])); + } + + return parsedDataCollection; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/edc4d4cc/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java index 75f9c4b..2e3c11c 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java @@ -26,7 +26,7 @@ import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; import java.nio.ByteBuffer; import org.apache.kylin.common.util.DateFormat; -import org.apache.kylin.common.util.StreamingMessage; +import org.apache.kylin.common.util.StreamingMessageRow; import org.apache.kylin.common.util.TimeUtil; import org.apache.kylin.metadata.model.TblColRef; @@ -62,11 +62,11 @@ public abstract class StreamingParser { /** * @param message - * @return StreamingMessage must not be NULL + * @return List<StreamingMessageRow> must not be NULL */ - abstract public StreamingMessage parse(ByteBuffer message); + abstract public List<StreamingMessageRow> parse(ByteBuffer message); - abstract public boolean filter(StreamingMessage streamingMessage); + abstract public boolean filter(StreamingMessageRow streamingMessageRow); public static StreamingParser getStreamingParser(String parserName, String parserProperties, List<TblColRef> columns) throws ReflectiveOperationException { if (!StringUtils.isEmpty(parserName)) { http://git-wip-us.apache.org/repos/asf/kylin/blob/edc4d4cc/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java index 6ff0d2f..de167b4 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java @@ -32,7 +32,7 @@ import java.util.Arrays; import com.fasterxml.jackson.databind.DeserializationFeature; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.util.ByteBufferBackedInputStream; -import org.apache.kylin.common.util.StreamingMessage; +import org.apache.kylin.common.util.StreamingMessageRow; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,7 +100,7 @@ public final class TimedJsonStreamParser extends StreamingParser { } @Override - public StreamingMessage parse(ByteBuffer buffer) { + public List<StreamingMessageRow> parse(ByteBuffer buffer) { try { Map<String, Object> message = mapper.readValue(new ByteBufferBackedInputStream(buffer), mapType); root.clear(); @@ -116,7 +116,10 @@ public final class TimedJsonStreamParser extends StreamingParser { } } - return new StreamingMessage(result, 0, t, Collections.<String, Object>emptyMap()); + StreamingMessageRow streamingMessageRow = new StreamingMessageRow(result, 0, t, Collections.<String, Object>emptyMap()); + List<StreamingMessageRow> messageRowList = new ArrayList<StreamingMessageRow>(); + messageRowList.add(streamingMessageRow); + return messageRowList; } catch (IOException e) { logger.error("error", e); throw new RuntimeException(e); @@ -124,7 +127,7 @@ public final class TimedJsonStreamParser extends StreamingParser { } @Override - public boolean filter(StreamingMessage streamingMessage) { + public boolean filter(StreamingMessageRow streamingMessageRow) { return true; } http://git-wip-us.apache.org/repos/asf/kylin/blob/edc4d4cc/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java index 230ff00..8dc840b 100644 --- a/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java +++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java @@ -28,7 +28,7 @@ import java.util.List; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.util.LocalFileMetadataTestCase; -import org.apache.kylin.common.util.StreamingMessage; +import org.apache.kylin.common.util.StreamingMessageRow; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.junit.AfterClass; @@ -65,8 +65,8 @@ public class TimedJsonStreamParserTest extends LocalFileMetadataTestCase { TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null); Object msg = mapper.readValue(new File(jsonFilePath), mapType); ByteBuffer buffer = getJsonByteBuffer(msg); - StreamingMessage sMsg = parser.parse(buffer); - List<String> result = sMsg.getData(); + List<StreamingMessageRow> msgList = parser.parse(buffer); + List<String> result = msgList.get(0).getData(); assertEquals("Jul 20, 2016 9:59:17 AM", result.get(0)); assertEquals("755703618762862600", result.get(1)); assertEquals("false", result.get(2)); @@ -80,8 +80,8 @@ public class TimedJsonStreamParserTest extends LocalFileMetadataTestCase { TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null); Object msg = mapper.readValue(new File(jsonFilePath), mapType); ByteBuffer buffer = getJsonByteBuffer(msg); - StreamingMessage sMsg = parser.parse(buffer); - List<String> result = sMsg.getData(); + List<StreamingMessageRow> msgList = parser.parse(buffer); + List<String> result = msgList.get(0).getData(); assertEquals("4853763947", result.get(0)); assertEquals("Noticias", result.get(1)); assertEquals("false", result.get(2)); @@ -96,8 +96,8 @@ public class TimedJsonStreamParserTest extends LocalFileMetadataTestCase { HashMap<String, Object> map = (HashMap<String, Object>) msg; Object array = map.get("mediaEntities"); ByteBuffer buffer = getJsonByteBuffer(msg); - StreamingMessage sMsg = parser.parse(buffer); - List<String> result = sMsg.getData(); + List<StreamingMessageRow> msgList = parser.parse(buffer); + List<String> result = msgList.get(0).getData(); System.out.println(result); } @@ -109,8 +109,8 @@ public class TimedJsonStreamParserTest extends LocalFileMetadataTestCase { TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null); Object msg = mapper.readValue(new File(jsonFilePath), mapType); ByteBuffer buffer = getJsonByteBuffer(msg); - StreamingMessage sMsg = parser.parse(buffer); - List<String> result = sMsg.getData(); + List<StreamingMessageRow> msgList = parser.parse(buffer); + List<String> result = msgList.get(0).getData(); } @@ -121,8 +121,8 @@ public class TimedJsonStreamParserTest extends LocalFileMetadataTestCase { TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null); Object msg = mapper.readValue(new File(jsonFilePath), mapType); ByteBuffer buffer = getJsonByteBuffer(msg); - StreamingMessage sMsg = parser.parse(buffer); - List<String> result = sMsg.getData(); + List<StreamingMessageRow> msgList = parser.parse(buffer); + List<String> result = msgList.get(0).getData(); assertEquals(StringUtils.EMPTY, result.get(0)); assertEquals(StringUtils.EMPTY, result.get(1)); }