KYLIN-1917 TopN counter merge performance improvement Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0deabb65 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0deabb65 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0deabb65
Branch: refs/heads/KYLIN-1971 Commit: 0deabb6504dd901b97585a7c25aea7643c80a6f7 Parents: 111e792 Author: shaofengshi <shaofeng...@apache.org> Authored: Thu Oct 20 13:21:27 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Thu Oct 20 14:27:52 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/measure/topn/Counter.java | 17 +- .../apache/kylin/measure/topn/TopNCounter.java | 234 +++++++------------ .../measure/topn/TopNCounterSerializer.java | 4 +- .../topn/TopNCounterSerializerTest.java | 2 +- .../measure/topn/TopNCounterBasicTest.java | 2 +- 5 files changed, 99 insertions(+), 160 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/0deabb65/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java index 041ea2b..cd5b825 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java @@ -31,6 +31,8 @@ import java.io.ObjectOutput; public class Counter<T> implements Externalizable { protected T item; + + protected double count; // protected double error; @@ -42,10 +44,15 @@ public class Counter<T> implements Externalizable { public Counter(T item) { this.count = 0; - // this.error = 0; this.item = item; } + public Counter(T item, double count) { + this.item = item; + this.count = count; + } + + public T getItem() { return item; } @@ -54,13 +61,11 @@ public class Counter<T> implements Externalizable { return count; } - // public double getError() { - // return error; - // } - + public void setCount(double count) { + this.count = count; + } @Override public String toString() { - // return item + ":" + count + ':' + error; return item + ":" + count; } http://git-wip-us.apache.org/repos/asf/kylin/blob/0deabb65/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java index ab4b40e..072fe90 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,20 +19,22 @@ package org.apache.kylin.measure.topn; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.kylin.common.util.Pair; - +import com.google.common.collect.Maps; import com.google.common.collect.Lists; import com.google.common.collect.Sets; /** * Modified from the StreamSummary.java in https://github.com/addthis/stream-lib - * + * * Based on the <i>Space-Saving</i> algorithm and the <i>Stream-Summary</i> * data structure as described in: * <i>Efficient Computation of Frequent and Top-k Elements in Data Streams</i> @@ -45,30 +47,30 @@ public class TopNCounter<T> implements Iterable<Counter<T>> { public static final int EXTRA_SPACE_RATE = 50; protected int capacity; - private HashMap<T, ListNode2<Counter<T>>> counterMap; - protected DoublyLinkedList<Counter<T>> counterList; + private HashMap<T, Counter<T>> counterMap; + protected LinkedList<Counter<T>> counterList; //a linked list, first the is the toppest element + private boolean ordered = true; + private boolean descending = true; /** * @param capacity maximum size (larger capacities improve accuracy) */ public TopNCounter(int capacity) { this.capacity = capacity; - counterMap = new HashMap<T, ListNode2<Counter<T>>>(); - counterList = new DoublyLinkedList<Counter<T>>(); + counterMap = Maps.newHashMap(); + counterList = Lists.newLinkedList(); } public int getCapacity() { return capacity; } - /** - * Algorithm: <i>Space-Saving</i> - * - * @param item stream element (<i>e</i>) - * @return false if item was already in the stream summary, true otherwise - */ - public boolean offer(T item) { - return offer(item, 1.0); + public LinkedList<Counter<T>> getCounterList() { + return counterList; + } + + public void offer(T item) { + offer(item, 1.0); } /** @@ -77,103 +79,42 @@ public class TopNCounter<T> implements Iterable<Counter<T>> { * @param item stream element (<i>e</i>) * @return false if item was already in the stream summary, true otherwise */ - public boolean offer(T item, double incrementCount) { - return offerReturnAll(item, incrementCount).getFirst(); - } - - /** - * @param item stream element (<i>e</i>) - * @return item dropped from summary if an item was dropped, null otherwise - */ - public T offerReturnDropped(T item, double incrementCount) { - return offerReturnAll(item, incrementCount).getSecond(); - } - - /** - * @param item stream element (<i>e</i>) - * @return Pair<isNewItem, itemDropped> where isNewItem is the return value of offer() and itemDropped is null if no item was dropped - */ - public Pair<Boolean, T> offerReturnAll(T item, double incrementCount) { - ListNode2<Counter<T>> counterNode = counterMap.get(item); - boolean isNewItem = (counterNode == null); - T droppedItem = null; - if (isNewItem) { - - if (size() < capacity) { - counterNode = counterList.enqueue(new Counter<T>(item)); - } else { - counterNode = counterList.tail(); - Counter<T> counter = counterNode.getValue(); - droppedItem = counter.item; - counterMap.remove(droppedItem); - counter.item = item; - counter.count = 0.0; - } + public void offer(T item, double incrementCount) { + Counter<T> counterNode = counterMap.get(item); + if (counterNode == null) { + counterNode = new Counter<T>(item, incrementCount); counterMap.put(item, counterNode); - } - - incrementCounter(counterNode, incrementCount); - - return Pair.newPair(isNewItem, droppedItem); - } - - protected void incrementCounter(ListNode2<Counter<T>> counterNode, double incrementCount) { - Counter<T> counter = counterNode.getValue(); - counter.count += incrementCount; - - ListNode2<Counter<T>> nodeNext; - - if (incrementCount > 0) { - nodeNext = counterNode.getNext(); - } else { - nodeNext = counterNode.getPrev(); - } - counterList.remove(counterNode); - counterNode.prev = null; - counterNode.next = null; - - if (incrementCount > 0) { - while (nodeNext != null && counter.count >= nodeNext.getValue().count) { - nodeNext = nodeNext.getNext(); - } - if (nodeNext != null) { - counterList.addBefore(nodeNext, counterNode); - } else { - counterList.add(counterNode); - } - + counterList.add(counterNode); } else { - while (nodeNext != null && counter.count < nodeNext.getValue().count) { - nodeNext = nodeNext.getPrev(); - } - if (nodeNext != null) { - counterList.addAfter(nodeNext, counterNode); - } else { - counterList.enqueue(counterNode); - } + counterNode.setCount(counterNode.getCount() + incrementCount); } - + ordered = false; } - public List<T> peek(int k) { - List<T> topK = new ArrayList<T>(k); + /** + * Resort and keep the expected size + */ + public void consolidate() { + Collections.sort(counterList, this.descending ? DESC_Comparator : ASC_Comparator); - for (ListNode2<Counter<T>> bNode = counterList.head(); bNode != null; bNode = bNode.getPrev()) { - Counter<T> b = bNode.getValue(); - if (topK.size() == k) { - return topK; + if (this.size() > this.capacity) { + for (int x = this.size() - 1; x >= capacity; x--) { + Counter<T> removed = counterList.remove(x); + this.counterMap.remove(removed.item); } - topK.add(b.item); } - return topK; + ordered = true; } public List<Counter<T>> topK(int k) { - List<Counter<T>> topK = new ArrayList<Counter<T>>(k); - - for (ListNode2<Counter<T>> bNode = counterList.head(); bNode != null; bNode = bNode.getPrev()) { - Counter<T> b = bNode.getValue(); + if (ordered == false) { + consolidate(); + } + List<Counter<T>> topK = new ArrayList<>(k); + Iterator<Counter<T>> iterator = counterList.iterator(); + while (iterator.hasNext()) { + Counter<T> b = iterator.next(); if (topK.size() == k) { return topK; } @@ -194,8 +135,9 @@ public class TopNCounter<T> implements Iterable<Counter<T>> { public String toString() { StringBuilder sb = new StringBuilder(); sb.append('['); - for (ListNode2<Counter<T>> bNode = counterList.head(); bNode != null; bNode = bNode.getPrev()) { - Counter<T> b = bNode.getValue(); + Iterator<Counter<T>> iterator = counterList.iterator(); + while (iterator.hasNext()) { + Counter<T> b = iterator.next(); sb.append(b.item); sb.append(':'); sb.append(b.count); @@ -211,10 +153,9 @@ public class TopNCounter<T> implements Iterable<Counter<T>> { * @param count */ public void offerToHead(T item, double count) { - Counter<T> c = new Counter<T>(item); - c.count = count; - ListNode2<Counter<T>> node = counterList.add(c); - counterMap.put(c.item, node); + Counter<T> c = new Counter<T>(item, count); + counterList.addFirst(c); + counterMap.put(c.item, c); } /** @@ -225,19 +166,19 @@ public class TopNCounter<T> implements Iterable<Counter<T>> { public TopNCounter<T> merge(TopNCounter<T> another) { double m1 = 0.0, m2 = 0.0; if (this.size() >= this.capacity) { - m1 = this.counterList.tail().getValue().count; + m1 = this.counterList.getLast().count; } if (another.size() >= another.capacity) { - m2 = another.counterList.tail().getValue().count; + m2 = another.counterList.getLast().count; } Set<T> duplicateItems = Sets.newHashSet(); List<T> notDuplicateItems = Lists.newArrayList(); - for (Map.Entry<T, ListNode2<Counter<T>>> entry : this.counterMap.entrySet()) { + for (Map.Entry<T, Counter<T>> entry : this.counterMap.entrySet()) { T item = entry.getKey(); - ListNode2<Counter<T>> existing = another.counterMap.get(item); + Counter<T> existing = another.counterMap.get(item); if (existing != null) { duplicateItems.add(item); } else { @@ -246,21 +187,22 @@ public class TopNCounter<T> implements Iterable<Counter<T>> { } for (T item : duplicateItems) { - this.offer(item, another.counterMap.get(item).getValue().count); + this.offer(item, another.counterMap.get(item).count); } for (T item : notDuplicateItems) { this.offer(item, m2); } - for (Map.Entry<T, ListNode2<Counter<T>>> entry : another.counterMap.entrySet()) { + for (Map.Entry<T, Counter<T>> entry : another.counterMap.entrySet()) { T item = entry.getKey(); if (duplicateItems.contains(item) == false) { - double counter = entry.getValue().getValue().count; + double counter = entry.getValue().count; this.offer(item, counter + m1); } } + this.consolidate(); return this; } @@ -271,13 +213,10 @@ public class TopNCounter<T> implements Iterable<Counter<T>> { public void retain(int newCapacity) { assert newCapacity > 0; this.capacity = newCapacity; - if (newCapacity < this.size()) { - ListNode2<Counter<T>> tail = counterList.tail(); - while (tail != null && this.size() > newCapacity) { - Counter<T> bucket = tail.getValue(); - this.counterMap.remove(bucket.getItem()); - this.counterList.remove(tail); - tail = this.counterList.tail(); + if (this.size() > newCapacity) { + for (int x = newCapacity; x < this.size(); x++) { + Counter<T> removed = counterList.remove(x); + this.counterMap.remove(removed.item); } } @@ -291,10 +230,15 @@ public class TopNCounter<T> implements Iterable<Counter<T>> { double[] counters = new double[size()]; int index = 0; - for (ListNode2<Counter<T>> bNode = counterList.tail(); bNode != null; bNode = bNode.getNext()) { - Counter<T> b = bNode.getValue(); - counters[index] = b.count; - index++; + if (this.descending == true) { + Iterator<Counter<T>> iterator = counterList.descendingIterator(); + while (iterator.hasNext()) { + Counter<T> b = iterator.next(); + counters[index] = b.count; + index++; + } + } else { + throw new IllegalStateException(); // support in future } assert index == size(); @@ -303,37 +247,27 @@ public class TopNCounter<T> implements Iterable<Counter<T>> { @Override public Iterator<Counter<T>> iterator() { - return new TopNCounterIterator(); - } - - /** - * Iterator from the tail (smallest) to head (biggest); - */ - private class TopNCounterIterator implements Iterator<Counter<T>> { - - private ListNode2<Counter<T>> currentBNode; - - private TopNCounterIterator() { - currentBNode = counterList.tail(); + if (this.descending == true) { + return this.counterList.descendingIterator(); + } else { + throw new IllegalStateException(); // support in future } + } + private static final Comparator ASC_Comparator = new Comparator<Counter>() { @Override - public boolean hasNext() { - return currentBNode != null; - + public int compare(Counter o1, Counter o2) { + return o1.getCount() > o2.getCount() ? 1 : o1.getCount() == o2.getCount() ? 0 : -1; } - @Override - public Counter<T> next() { - Counter<T> counter = currentBNode.getValue(); - currentBNode = currentBNode.getNext(); - return counter; - } + }; + private static final Comparator DESC_Comparator = new Comparator<Counter>() { @Override - public void remove() { - throw new UnsupportedOperationException(); + public int compare(Counter o1, Counter o2) { + return o1.getCount() > o2.getCount() ? -1 : o1.getCount() == o2.getCount() ? 0 : 1; } - } + + }; } http://git-wip-us.apache.org/repos/asf/kylin/blob/0deabb65/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java index 604365c..071e2a2 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java @@ -65,8 +65,8 @@ public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteAr @Override public void serialize(TopNCounter<ByteArray> value, ByteBuffer out) { double[] counters = value.getCounters(); - List<ByteArray> peek = value.peek(1); - int keyLength = peek.size() > 0 ? peek.get(0).length() : 0; + List<Counter<ByteArray>> peek = value.topK(1); + int keyLength = peek.size() > 0 ? peek.get(0).getItem().length() : 0; out.putInt(value.getCapacity()); out.putInt(value.size()); out.putInt(keyLength); http://git-wip-us.apache.org/repos/asf/kylin/blob/0deabb65/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java index 7e7fd31..dedb4f5 100644 --- a/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java +++ b/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java @@ -55,7 +55,7 @@ public class TopNCounterSerializerTest extends LocalFileMetadataTestCase { for (Integer i : stream) { vs.offer(new ByteArray(Bytes.toBytes(i))); } - + vs.consolidate(); ByteBuffer out = ByteBuffer.allocate(1024); serializer.serialize(vs, out); http://git-wip-us.apache.org/repos/asf/kylin/blob/0deabb65/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java index cb92338..162ef01 100644 --- a/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java +++ b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java @@ -44,7 +44,7 @@ public class TopNCounterBasicTest { @Test public void testTopK() { - TopNCounter<String> vs = new TopNCounter<String>(3); + TopNCounter<String> vs = new TopNCounter<>(3); String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A" }; for (String i : stream) { vs.offer(i);