Repository: kylin
Updated Branches:
  refs/heads/master b4329a66a -> 30a4162dd


KYLIN-1917 TopN counter merge performance improvement

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/30a4162d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/30a4162d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/30a4162d

Branch: refs/heads/master
Commit: 30a4162dd7af4158c90b6965a5bcf14119fae2e1
Parents: 501c12a
Author: shaofengshi <shaofeng...@apache.org>
Authored: Thu Oct 20 14:27:30 2016 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Thu Oct 20 14:27:52 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/measure/topn/TopNCounter.java  | 16 +++++-----------
 .../kylin/source/kafka/job/SeekOffsetStep.java      | 15 +++++++--------
 2 files changed, 12 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/30a4162d/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java 
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
index 072fe90..cf9978a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
@@ -96,14 +96,7 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
      */
     public void consolidate() {
         Collections.sort(counterList, this.descending ? DESC_Comparator : 
ASC_Comparator);
-
-        if (this.size() > this.capacity) {
-            for (int x = this.size() - 1; x >= capacity; x--) {
-                Counter<T> removed = counterList.remove(x);
-                this.counterMap.remove(removed.item);
-            }
-        }
-
+        retain(capacity);
         ordered = true;
     }
 
@@ -214,9 +207,10 @@ public class TopNCounter<T> implements 
Iterable<Counter<T>> {
         assert newCapacity > 0;
         this.capacity = newCapacity;
         if (this.size() > newCapacity) {
-            for (int x = newCapacity; x < this.size(); x++) {
-                Counter<T> removed = counterList.remove(x);
-                this.counterMap.remove(removed.item);
+            Counter<T> toRemoved;
+            for (int i = 0, n = this.size() - newCapacity; i < n; i++) {
+                toRemoved = counterList.pollLast();
+                this.counterMap.remove(toRemoved.item);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/30a4162d/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
index a26f39d..98d6e4d 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
@@ -84,16 +84,15 @@ public class SeekOffsetStep extends AbstractExecutable {
                         }
                     }
                 }
+                logger.info("Get start offset for segment " + 
segment.getName() + ": " + startOffsets.toString());
+            }
 
-                if (partitionInfos.size() > startOffsets.size()) {
-                    // has new partition added
-                    for (int x = startOffsets.size(); x < 
partitionInfos.size(); x++) {
-                        long earliest = 
KafkaClient.getEarliestOffset(consumer, topic, 
partitionInfos.get(x).partition());
-                        startOffsets.put(partitionInfos.get(x).partition(), 
earliest);
-                    }
+            if (partitionInfos.size() > startOffsets.size()) {
+                // has new partition added
+                for (int x = startOffsets.size(); x < partitionInfos.size(); 
x++) {
+                    long earliest = KafkaClient.getEarliestOffset(consumer, 
topic, partitionInfos.get(x).partition());
+                    startOffsets.put(partitionInfos.get(x).partition(), 
earliest);
                 }
-
-                logger.info("Get start offset for segment " + 
segment.getName() + ": " + startOffsets.toString());
             }
 
             if (endOffsets.isEmpty()) {

Reply via email to