Repository: kylin Updated Branches: refs/heads/master b4329a66a -> 30a4162dd
KYLIN-1917 TopN counter merge performance improvement Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/30a4162d Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/30a4162d Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/30a4162d Branch: refs/heads/master Commit: 30a4162dd7af4158c90b6965a5bcf14119fae2e1 Parents: 501c12a Author: shaofengshi <shaofeng...@apache.org> Authored: Thu Oct 20 14:27:30 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Thu Oct 20 14:27:52 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/measure/topn/TopNCounter.java | 16 +++++----------- .../kylin/source/kafka/job/SeekOffsetStep.java | 15 +++++++-------- 2 files changed, 12 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/30a4162d/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java index 072fe90..cf9978a 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java @@ -96,14 +96,7 @@ public class TopNCounter<T> implements Iterable<Counter<T>> { */ public void consolidate() { Collections.sort(counterList, this.descending ? DESC_Comparator : ASC_Comparator); - - if (this.size() > this.capacity) { - for (int x = this.size() - 1; x >= capacity; x--) { - Counter<T> removed = counterList.remove(x); - this.counterMap.remove(removed.item); - } - } - + retain(capacity); ordered = true; } @@ -214,9 +207,10 @@ public class TopNCounter<T> implements Iterable<Counter<T>> { assert newCapacity > 0; this.capacity = newCapacity; if (this.size() > newCapacity) { - for (int x = newCapacity; x < this.size(); x++) { - Counter<T> removed = counterList.remove(x); - this.counterMap.remove(removed.item); + Counter<T> toRemoved; + for (int i = 0, n = this.size() - newCapacity; i < n; i++) { + toRemoved = counterList.pollLast(); + this.counterMap.remove(toRemoved.item); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/30a4162d/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java index a26f39d..98d6e4d 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java @@ -84,16 +84,15 @@ public class SeekOffsetStep extends AbstractExecutable { } } } + logger.info("Get start offset for segment " + segment.getName() + ": " + startOffsets.toString()); + } - if (partitionInfos.size() > startOffsets.size()) { - // has new partition added - for (int x = startOffsets.size(); x < partitionInfos.size(); x++) { - long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfos.get(x).partition()); - startOffsets.put(partitionInfos.get(x).partition(), earliest); - } + if (partitionInfos.size() > startOffsets.size()) { + // has new partition added + for (int x = startOffsets.size(); x < partitionInfos.size(); x++) { + long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfos.get(x).partition()); + startOffsets.put(partitionInfos.get(x).partition(), earliest); } - - logger.info("Get start offset for segment " + segment.getName() + ": " + startOffsets.toString()); } if (endOffsets.isEmpty()) {