Repository: kylin Updated Branches: refs/heads/master 9284b47c3 -> 44c6fc9b8
KYLIN-1635 Refine serialize performance as well Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/44c6fc9b Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/44c6fc9b Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/44c6fc9b Branch: refs/heads/master Commit: 44c6fc9b8f5b39d217466a2b7887730a82566712 Parents: 9284b47 Author: Li Yang <liy...@apache.org> Authored: Fri Apr 29 12:15:10 2016 +0800 Committer: Li Yang <liy...@apache.org> Committed: Fri Apr 29 12:15:25 2016 +0800 ---------------------------------------------------------------------- .../gridtable/SortedGTRecordGenerator.java | 30 +- .../measure/hllc/HyperLogLogPlusCounter.java | 31 +- .../measure/hll/HyperLogLogCounterTest.java | 484 +++++++++---------- 3 files changed, 291 insertions(+), 254 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/44c6fc9b/core-cube/src/test/java/org/apache/kylin/gridtable/SortedGTRecordGenerator.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/SortedGTRecordGenerator.java b/core-cube/src/test/java/org/apache/kylin/gridtable/SortedGTRecordGenerator.java index 7e3c26e..d8b5e48 100644 --- a/core-cube/src/test/java/org/apache/kylin/gridtable/SortedGTRecordGenerator.java +++ b/core-cube/src/test/java/org/apache/kylin/gridtable/SortedGTRecordGenerator.java @@ -49,12 +49,17 @@ public class SortedGTRecordGenerator { } public void addMeasure(int length) { + addMeasure(length, new BytesRandomizer(length)); + } + + public void addMeasure(int length, Randomizer randomizer) { assert length > 0; ColSpec spec = new ColSpec(); spec.length = length; + spec.measureRandomizer = randomizer; colSpecs.add(spec); } - + public IGTScanner generate(long nRows) { validate(); return new Generator(nRows); @@ -75,6 +80,26 @@ public class SortedGTRecordGenerator { long cardinality; Map<Integer, Integer> weights; long weightSum; + Randomizer measureRandomizer; + } + + public interface Randomizer { + int fillRandom(Random rand, byte[] array, int offset); + } + + public static class BytesRandomizer implements Randomizer { + final private byte bytes[]; + + public BytesRandomizer(int len) { + this.bytes = new byte[len]; + } + + @Override + public int fillRandom(Random rand, byte[] array, int offset) { + rand.nextBytes(bytes); + System.arraycopy(bytes, 0, array, offset, bytes.length); + return bytes.length; + } } private class Generator implements IGTScanner { @@ -125,7 +150,8 @@ public class SortedGTRecordGenerator { } // measure case else { - rand.nextBytes(rec.get(i).array()); + int len = spec.measureRandomizer.fillRandom(rand, rec.get(i).array(), 0); + rec.get(i).setLength(len); } } counter++; http://git-wip-us.apache.org/repos/asf/kylin/blob/44c6fc9b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusCounter.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusCounter.java index 29f7ffc..4e6a652 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusCounter.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusCounter.java @@ -139,12 +139,18 @@ public class HyperLogLogPlusCounter implements Serializable, Comparable<HyperLog } private int size() { - int size = 0; - for (int i = 0; i < m; i++) { - if (registers[i] > 0) - size++; + if (singleBucket == -1) { + return 0; + } else if (singleBucket >= 0) { + return 1; + } else { + int size = 0; + for (int i = 0; i < m; i++) { + if (registers[i] > 0) + size++; + } + return size; } - return size; } @Override @@ -210,10 +216,17 @@ public class HyperLogLogPlusCounter implements Serializable, Comparable<HyperLog if (scheme == 0) { // map scheme BytesUtil.writeVInt(size, out); - for (int i = 0; i < m; i++) { - if (registers[i] > 0) { - writeUnsigned(i, indexLen, out); - out.put(registers[i]); + if (singleBucket == -1) { + ; // no non-zero register + } else if (singleBucket >= 0) { + writeUnsigned(singleBucket, indexLen, out); + out.put(registers[singleBucket]); + } else { + for (int i = 0; i < m; i++) { + if (registers[i] > 0) { + writeUnsigned(i, indexLen, out); + out.put(registers[i]); + } } } } else if (scheme == 1) { // array scheme http://git-wip-us.apache.org/repos/asf/kylin/blob/44c6fc9b/core-metadata/src/test/java/org/apache/kylin/measure/hll/HyperLogLogCounterTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/hll/HyperLogLogCounterTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/hll/HyperLogLogCounterTest.java index 77ca9f3..b42fd50 100644 --- a/core-metadata/src/test/java/org/apache/kylin/measure/hll/HyperLogLogCounterTest.java +++ b/core-metadata/src/test/java/org/apache/kylin/measure/hll/HyperLogLogCounterTest.java @@ -1,216 +1,214 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.measure.hll; - -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.HashSet; -import java.util.Random; -import java.util.Set; - -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; -import org.junit.Assert; -import org.junit.Test; - -/** - * @author yangli9 - * - */ -public class HyperLogLogCounterTest { - - ByteBuffer buf = ByteBuffer.allocate(1024 * 1024); - Random rand1 = new Random(1); - Random rand2 = new Random(2); - Random rand3 = new Random(3); - int errorCount1 = 0; - int errorCount2 = 0; - int errorCount3 = 0; - - @Test - public void testOneAdd() throws IOException { - HyperLogLogPlusCounter hllc = new HyperLogLogPlusCounter(14); - HyperLogLogPlusCounter one = new HyperLogLogPlusCounter(14); - for (int i = 0; i < 1000000; i++) { - one.clear(); - one.add(rand1.nextInt()); - hllc.merge(one); - } - } - - @Test - public void testPeekLength() throws IOException { - HyperLogLogPlusCounter hllc = new HyperLogLogPlusCounter(10); - HyperLogLogPlusCounter copy = new HyperLogLogPlusCounter(10); - byte[] value = new byte[10]; - for (int i = 0; i < 200000; i++) { - rand1.nextBytes(value); - hllc.add(value); - - buf.clear(); - hllc.writeRegisters(buf); - - int len = buf.position(); - buf.position(0); - assertEquals(len, hllc.peekLength(buf)); - - copy.readRegisters(buf); - assertEquals(len, buf.position()); - assertEquals(hllc, copy); - } - buf.clear(); - } - - private Set<String> generateTestData(int n) { - Set<String> testData = new HashSet<String>(); - for (int i = 0; i < n; i++) { - String[] samples = generateSampleData(); - for (String sample : samples) { - testData.add(sample); - } - } - return testData; - } - - // simulate the visit (=visitor+id) - private String[] generateSampleData() { - - StringBuilder buf = new StringBuilder(); - for (int i = 0; i < 19; i++) { - buf.append(Math.abs(rand1.nextInt()) % 10); - } - String header = buf.toString(); - - int size = Math.abs(rand3.nextInt()) % 9 + 1; - String[] samples = new String[size]; - for (int k = 0; k < size; k++) { - buf = new StringBuilder(header); - buf.append("-"); - for (int i = 0; i < 10; i++) { - buf.append(Math.abs(rand3.nextInt()) % 10); - } - samples[k] = buf.toString(); - } - - return samples; - } - - @Test - public void countTest() throws IOException { - int n = 10; - for (int i = 0; i < 5; i++) { - count(n); - n *= 10; - } - } - - private void count(int n) throws IOException { - Set<String> testSet = generateTestData(n); - - HyperLogLogPlusCounter hllc = newHLLC(); - for (String testData : testSet) { - hllc.add(Bytes.toBytes(testData)); - } - long estimate = hllc.getCountEstimate(); - double errorRate = hllc.getErrorRate(); - double actualError = (double) Math.abs(testSet.size() - estimate) / testSet.size(); - System.out.println(estimate); - System.out.println(testSet.size()); - System.out.println(errorRate); - System.out.println("=" + actualError); - Assert.assertTrue(actualError < errorRate * 3.0); - - checkSerialize(hllc); - } - - private void checkSerialize(HyperLogLogPlusCounter hllc) throws IOException { - long estimate = hllc.getCountEstimate(); - buf.clear(); - hllc.writeRegisters(buf); - buf.flip(); - hllc.readRegisters(buf); - Assert.assertEquals(estimate, hllc.getCountEstimate()); - } - - @Test - public void mergeTest() throws IOException { - double error = 0; - double absError = 0; - int n = 100; - for (int i = 0; i < n; i++) { +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure.hll; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; +import org.junit.Assert; +import org.junit.Test; + +/** + * @author yangli9 + * + */ +public class HyperLogLogCounterTest { + + ByteBuffer buf = ByteBuffer.allocate(1024 * 1024); + Random rand1 = new Random(1); + Random rand2 = new Random(2); + Random rand3 = new Random(3); + int errorCount1 = 0; + int errorCount2 = 0; + int errorCount3 = 0; + + @Test + public void testOneAdd() throws IOException { + HyperLogLogPlusCounter hllc = new HyperLogLogPlusCounter(14); + HyperLogLogPlusCounter one = new HyperLogLogPlusCounter(14); + for (int i = 0; i < 1000000; i++) { + one.clear(); + one.add(rand1.nextInt()); + hllc.merge(one); + } + } + + @Test + public void testPeekLength() throws IOException { + HyperLogLogPlusCounter hllc = new HyperLogLogPlusCounter(10); + HyperLogLogPlusCounter copy = new HyperLogLogPlusCounter(10); + byte[] value = new byte[10]; + for (int i = 0; i < 200000; i++) { + rand1.nextBytes(value); + hllc.add(value); + + buf.clear(); + hllc.writeRegisters(buf); + + int len = buf.position(); + buf.position(0); + assertEquals(len, hllc.peekLength(buf)); + + copy.readRegisters(buf); + assertEquals(len, buf.position()); + assertEquals(hllc, copy); + } + buf.clear(); + } + + private Set<String> generateTestData(int n) { + Set<String> testData = new HashSet<String>(); + for (int i = 0; i < n; i++) { + String[] samples = generateSampleData(); + for (String sample : samples) { + testData.add(sample); + } + } + return testData; + } + + // simulate the visit (=visitor+id) + private String[] generateSampleData() { + + StringBuilder buf = new StringBuilder(); + for (int i = 0; i < 19; i++) { + buf.append(Math.abs(rand1.nextInt()) % 10); + } + String header = buf.toString(); + + int size = Math.abs(rand3.nextInt()) % 9 + 1; + String[] samples = new String[size]; + for (int k = 0; k < size; k++) { + buf = new StringBuilder(header); + buf.append("-"); + for (int i = 0; i < 10; i++) { + buf.append(Math.abs(rand3.nextInt()) % 10); + } + samples[k] = buf.toString(); + } + + return samples; + } + + @Test + public void countTest() throws IOException { + int n = 10; + for (int i = 0; i < 5; i++) { + count(n); + n *= 10; + } + } + + private void count(int n) throws IOException { + Set<String> testSet = generateTestData(n); + + HyperLogLogPlusCounter hllc = newHLLC(); + for (String testData : testSet) { + hllc.add(Bytes.toBytes(testData)); + } + long estimate = hllc.getCountEstimate(); + double errorRate = hllc.getErrorRate(); + double actualError = (double) Math.abs(testSet.size() - estimate) / testSet.size(); + System.out.println(estimate); + System.out.println(testSet.size()); + System.out.println(errorRate); + System.out.println("=" + actualError); + Assert.assertTrue(actualError < errorRate * 3.0); + + checkSerialize(hllc); + } + + private void checkSerialize(HyperLogLogPlusCounter hllc) throws IOException { + long estimate = hllc.getCountEstimate(); + buf.clear(); + hllc.writeRegisters(buf); + buf.flip(); + hllc.readRegisters(buf); + Assert.assertEquals(estimate, hllc.getCountEstimate()); + } + + @Test + public void mergeTest() throws IOException { + double error = 0; + int n = 100; + for (int i = 0; i < n; i++) { double e = merge(i); - error += e; - absError += Math.abs(e); - } + error += e; + } System.out.println("Total average error is " + error / n); - - System.out.println(" errorRateCount1 is " + errorCount1 + "!"); - System.out.println(" errorRateCount2 is " + errorCount2 + "!"); - System.out.println(" errorRateCount3 is " + errorCount3 + "!"); - + + System.out.println(" errorRateCount1 is " + errorCount1 + "!"); + System.out.println(" errorRateCount2 is " + errorCount2 + "!"); + System.out.println(" errorRateCount3 is " + errorCount3 + "!"); + Assert.assertTrue(errorCount1 <= n * 0.30); Assert.assertTrue(errorCount2 <= n * 0.05); - Assert.assertTrue(errorCount3 <= n * 0.02); - } - + Assert.assertTrue(errorCount3 <= n * 0.02); + } + private double merge(int round) throws IOException { int ln = 20; int dn = 100 * (round + 1); - Set<String> testSet = new HashSet<String>(); - HyperLogLogPlusCounter[] hllcs = new HyperLogLogPlusCounter[ln]; - for (int i = 0; i < ln; i++) { - hllcs[i] = newHLLC(); - for (int k = 0; k < dn; k++) { - String[] samples = generateSampleData(); - for (String data : samples) { - testSet.add(data); - hllcs[i].add(Bytes.toBytes(data)); - } - } - } - HyperLogLogPlusCounter mergeHllc = newHLLC(); - for (HyperLogLogPlusCounter hllc : hllcs) { + Set<String> testSet = new HashSet<String>(); + HyperLogLogPlusCounter[] hllcs = new HyperLogLogPlusCounter[ln]; + for (int i = 0; i < ln; i++) { + hllcs[i] = newHLLC(); + for (int k = 0; k < dn; k++) { + String[] samples = generateSampleData(); + for (String data : samples) { + testSet.add(data); + hllcs[i].add(Bytes.toBytes(data)); + } + } + } + HyperLogLogPlusCounter mergeHllc = newHLLC(); + for (HyperLogLogPlusCounter hllc : hllcs) { mergeHllc.merge(serDes(hllc)); - } - - double errorRate = mergeHllc.getErrorRate(); - long estimate = mergeHllc.getCountEstimate(); + } + + double errorRate = mergeHllc.getErrorRate(); + long estimate = mergeHllc.getCountEstimate(); double actualError = Math.abs((double) (testSet.size() - estimate) / testSet.size()); - - System.out.println(testSet.size() + "-" + estimate + " ~ " + actualError); + + System.out.println(testSet.size() + "-" + estimate + " ~ " + actualError); Assert.assertTrue(actualError < 0.1); - + if (actualError > errorRate) { - errorCount1++; - } + errorCount1++; + } if (actualError > 2 * errorRate) { - errorCount2++; - } + errorCount2++; + } if (actualError > 3 * errorRate) { - errorCount3++; - } - - return actualError; - } - + errorCount3++; + } + + return actualError; + } + private HyperLogLogPlusCounter serDes(HyperLogLogPlusCounter hllc) throws IOException { buf.clear(); hllc.writeRegisters(buf); @@ -220,46 +218,46 @@ public class HyperLogLogCounterTest { Assert.assertEquals(copy.getCountEstimate(), hllc.getCountEstimate()); return copy; } - - @Test - public void testPerformance() throws IOException { - int N = 3; // reduce N HLLC into one - int M = 1000; // for M times, use 100000 for real perf test - - HyperLogLogPlusCounter samples[] = new HyperLogLogPlusCounter[N]; - for (int i = 0; i < N; i++) { - samples[i] = newHLLC(); - for (String str : generateTestData(10000)) - samples[i].add(str); - } - - System.out.println("Perf test running ... "); - long start = System.currentTimeMillis(); - HyperLogLogPlusCounter sum = newHLLC(); - for (int i = 0; i < M; i++) { - sum.clear(); - for (int j = 0; j < N; j++) { - sum.merge(samples[j]); - checkSerialize(sum); - } - } - long duration = System.currentTimeMillis() - start; - System.out.println("Perf test result: " + duration / 1000 + " seconds"); - } - - @Test - public void testEquivalence() { - byte[] a = new byte[] { 0, 3, 4, 42, 2, 2 }; - byte[] b = new byte[] { 3, 4, 42 }; - HyperLogLogPlusCounter ha = new HyperLogLogPlusCounter(); - HyperLogLogPlusCounter hb = new HyperLogLogPlusCounter(); - ha.add(a, 1, 3); - hb.add(b); - - Assert.assertTrue(ha.getCountEstimate() == hb.getCountEstimate()); - } - - private HyperLogLogPlusCounter newHLLC() { - return new HyperLogLogPlusCounter(16); - } -} + + @Test + public void testPerformance() throws IOException { + int N = 3; // reduce N HLLC into one + int M = 1000; // for M times, use 100000 for real perf test + + HyperLogLogPlusCounter samples[] = new HyperLogLogPlusCounter[N]; + for (int i = 0; i < N; i++) { + samples[i] = newHLLC(); + for (String str : generateTestData(10000)) + samples[i].add(str); + } + + System.out.println("Perf test running ... "); + long start = System.currentTimeMillis(); + HyperLogLogPlusCounter sum = newHLLC(); + for (int i = 0; i < M; i++) { + sum.clear(); + for (int j = 0; j < N; j++) { + sum.merge(samples[j]); + checkSerialize(sum); + } + } + long duration = System.currentTimeMillis() - start; + System.out.println("Perf test result: " + duration / 1000 + " seconds"); + } + + @Test + public void testEquivalence() { + byte[] a = new byte[] { 0, 3, 4, 42, 2, 2 }; + byte[] b = new byte[] { 3, 4, 42 }; + HyperLogLogPlusCounter ha = new HyperLogLogPlusCounter(); + HyperLogLogPlusCounter hb = new HyperLogLogPlusCounter(); + ha.add(a, 1, 3); + hb.add(b); + + Assert.assertTrue(ha.getCountEstimate() == hb.getCountEstimate()); + } + + private HyperLogLogPlusCounter newHLLC() { + return new HyperLogLogPlusCounter(16); + } +}