refactor some streaming classes
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cd116a6c Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cd116a6c Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cd116a6c Branch: refs/heads/stream_m1 Commit: cd116a6c5625009fe40f4136792c382a9a355e5f Parents: 25081d9 Author: shaofengshi <shaofeng...@apache.org> Authored: Thu Jun 23 10:34:52 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Fri Jun 24 15:39:29 2016 +0800 ---------------------------------------------------------------------- .../src/test/java/org/apache/kylin/job/DeployUtil.java | 2 +- .../org/apache/kylin/job/hadoop/invertedindex/IITest.java | 8 ++------ .../org/apache/kylin/common/util/StreamingMessage.java | 4 ++++ .../apache/kylin/source/kafka/KafkaStreamingInput.java | 3 ++- .../org/apache/kylin/source/kafka/StreamingParser.java | 3 ++- .../apache/kylin/source/kafka/StringStreamingParser.java | 10 ++++------ .../apache/kylin/source/kafka/TimedJsonStreamParser.java | 9 ++++----- .../kylin/source/kafka/diagnose/KafkaInputAnalyzer.java | 3 ++- .../org/apache/kylin/source/kafka/util/KafkaUtils.java | 3 ++- 9 files changed, 23 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/cd116a6c/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java index a0a9f88..d56dd64 100644 --- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java +++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java @@ -168,7 +168,7 @@ public class DeployUtil { TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, "formatTs=true"); StringBuilder sb = new StringBuilder(); for (String json : data) { - List<String> rowColumns = timedJsonStreamParser.parse(new MessageAndOffset(new Message(json.getBytes()), 0)).getData(); + List<String> rowColumns = timedJsonStreamParser.parse((new MessageAndOffset(new Message(json.getBytes()), 0)).message().payload()).getData(); sb.append(StringUtils.join(rowColumns, ",")); sb.append(System.getProperty("line.separator")); } http://git-wip-us.apache.org/repos/asf/kylin/blob/cd116a6c/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java index da25143..c34ce55 100644 --- a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java +++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java @@ -38,11 +38,7 @@ import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.kylin.common.util.FIFOIterable; -import org.apache.kylin.common.util.LocalFileMetadataTestCase; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.common.util.StreamingBatch; -import org.apache.kylin.common.util.StreamingMessage; +import org.apache.kylin.common.util.*; import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; import org.apache.kylin.invertedindex.IISegment; @@ -114,7 +110,7 @@ public class IITest extends LocalFileMetadataTestCase { @Nullable @Override public StreamingMessage apply(@Nullable MessageAndOffset input) { - return parser.parse(input); + return parser.parse(input.message().payload()); } }); StreamingBatch batch = new StreamingBatch(streamingMessages, Pair.newPair(0L, System.currentTimeMillis())); http://git-wip-us.apache.org/repos/asf/kylin/blob/cd116a6c/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java index f327db2..53ab195 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java @@ -51,6 +51,10 @@ public class StreamingMessage { return offset; } + public void setOffset(long offset) { + this.offset = offset; + } + public final long getTimestamp() { return timestamp; } http://git-wip-us.apache.org/repos/asf/kylin/blob/cd116a6c/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java index 564c221..3243754 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java @@ -206,7 +206,8 @@ public class KafkaStreamingInput implements IStreamingInput { for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) { offset++; consumeMsgCount++; - final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset); + final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload()); + streamingMessage.setOffset(messageAndOffset.offset()); if (streamingParser.filter(streamingMessage)) { final long timestamp = streamingMessage.getTimestamp(); if (timestamp >= timeRange.getFirst() && timestamp < timeRange.getSecond()) { http://git-wip-us.apache.org/repos/asf/kylin/blob/cd116a6c/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java index 9075c77..3bc42ac 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java @@ -40,6 +40,7 @@ import java.util.Set; import com.google.common.collect.Sets; import org.apache.commons.lang3.StringUtils; +import java.nio.ByteBuffer; import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.StreamingMessage; import org.apache.kylin.common.util.TimeUtil; @@ -66,7 +67,7 @@ public abstract class StreamingParser { * @param message * @return StreamingMessage must not be NULL */ - abstract public StreamingMessage parse(Object message); + abstract public StreamingMessage parse(ByteBuffer message); abstract public boolean filter(StreamingMessage streamingMessage); http://git-wip-us.apache.org/repos/asf/kylin/blob/cd116a6c/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java index 5226899..37bcbfa 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java @@ -55,12 +55,10 @@ public final class StringStreamingParser extends StreamingParser { } @Override - public StreamingMessage parse(Object message) { - MessageAndOffset kafkaMessage = (MessageAndOffset) message; - final ByteBuffer payload = kafkaMessage.message().payload(); - byte[] bytes = new byte[payload.limit()]; - payload.get(bytes); - return new StreamingMessage(Lists.newArrayList(new String(bytes).split(",")), kafkaMessage.offset(), kafkaMessage.offset(), Collections.<String, Object> emptyMap()); + public StreamingMessage parse(ByteBuffer message) { + byte[] bytes = new byte[message.limit()]; + message.get(bytes); + return new StreamingMessage(Lists.newArrayList(new String(bytes).split(",")), 0, 0, Collections.<String, Object> emptyMap()); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/cd116a6c/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java index 63f5637..4b1c579 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java @@ -37,9 +37,9 @@ package org.apache.kylin.source.kafka; import java.io.IOException; import java.util.*; -import kafka.message.MessageAndOffset; import org.apache.commons.lang3.StringUtils; +import java.nio.ByteBuffer; import org.apache.kylin.common.util.StreamingMessage; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; @@ -95,10 +95,9 @@ public final class TimedJsonStreamParser extends StreamingParser { } @Override - public StreamingMessage parse(Object msg) { - MessageAndOffset messageAndOffset = (MessageAndOffset) msg; + public StreamingMessage parse(ByteBuffer buffer) { try { - Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType); + Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(buffer), mapType); Map<String, String> root = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER); root.putAll(message); String tsStr = root.get(tsColName); @@ -119,7 +118,7 @@ public final class TimedJsonStreamParser extends StreamingParser { } } - return new StreamingMessage(result, messageAndOffset.offset(), t, Collections.<String, Object>emptyMap()); + return new StreamingMessage(result, 0, t, Collections.<String, Object>emptyMap()); } catch (IOException e) { logger.error("error", e); http://git-wip-us.apache.org/repos/asf/kylin/blob/cd116a6c/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java index 0e29a0c..19fc87f 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java @@ -109,7 +109,8 @@ public class KafkaInputAnalyzer extends AbstractApplication { offset++; consumeMsgCount++; - final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset); + final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload()); + streamingMessage.setOffset(messageAndOffset.offset()); if (streamingParser.filter(streamingMessage)) { streamQueue.add(streamingMessage); } http://git-wip-us.apache.org/repos/asf/kylin/blob/cd116a6c/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java index f506999..a2984b6 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java @@ -162,7 +162,8 @@ public final class KafkaUtils { final ByteBuffer payload = messageAndOffset.message().payload(); byte[] bytes = new byte[payload.limit()]; payload.get(bytes); - final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset); + final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload()); + streamingMessage.setOffset(messageAndOffset.offset()); logger.debug(String.format("The timestamp of topic: %s, partitionId: %d, offset: %d is: %d", topic, partitionId, offset, streamingMessage.getTimestamp())); return streamingMessage.getTimestamp();