This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 8cd36f1 [Spark Load] Support java version hyperloglog (#3320) 8cd36f1 is described below commit 8cd36f1c5dd250fc279076f411cff48ec40918c0 Author: wangbo <506340...@qq.com> AuthorDate: Sun Jun 21 09:37:05 2020 +0800 [Spark Load] Support java version hyperloglog (#3320) mainly used for Spark Load process to calculate approximate deduplication value and then serialize to parquet file. Try to keep the same calculation semantic with be's C++ version --- be/test/olap/hll_test.cpp | 2 +- .../java/org/apache/doris/load/loadv2/Hll.java | 396 +++++++++++++++++++++ .../java/org/apache/doris/load/loadv2/HllTest.java | 268 ++++++++++++++ 3 files changed, 665 insertions(+), 1 deletion(-) diff --git a/be/test/olap/hll_test.cpp b/be/test/olap/hll_test.cpp index 0ec1c46..0b244ff 100644 --- a/be/test/olap/hll_test.cpp +++ b/be/test/olap/hll_test.cpp @@ -32,7 +32,7 @@ public: static uint64_t hash(uint64_t value) { return HashUtil::murmur_hash64A(&value, 8, 0); } - +// keep logic same with java version in fe when you change hll_test.cpp,see HllTest.java TEST_F(TestHll, Normal) { uint8_t buf[HLL_REGISTERS_COUNT + 1]; diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/Hll.java b/fe/src/main/java/org/apache/doris/load/loadv2/Hll.java new file mode 100644 index 0000000..89eafb1 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/loadv2/Hll.java @@ -0,0 +1,396 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.commons.codec.binary.StringUtils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.math.BigInteger; +import java.util.HashSet; +import java.util.Set; + +/** + * mainly used for Spark Load process to produce hll + * Try to keep consistent with be's C++ hll + * Whether method estimateCardinality can keep consistent with is a question to be studied + * + * use example: + * Hll hll = new hll(); + * Hll.updateWithHash(value); + * + */ + +public class Hll { + + public static final byte HLL_DATA_EMPTY = 0; + public static final byte HLL_DATA_EXPLICIT = 1; + public static final byte HLL_DATA_SPARSE = 2; + public static final byte HLL_DATA_FULL = 3; + + public static final int HLL_COLUMN_PRECISION = 14; + public static final int HLL_ZERO_COUNT_BITS = (64 - HLL_COLUMN_PRECISION); + public static final int HLL_EXPLICLIT_INT64_NUM = 160; + public static final int HLL_SPARSE_THRESHOLD = 4096; + public static final int HLL_REGISTERS_COUNT = 16 * 1024; + + private int type; + private Set<Long> hashSet; + private byte[] registers; + + public Hll() { + type = HLL_DATA_EMPTY; + this.hashSet = new HashSet<>(); + } + + private void convertExplicitToRegister() { + assert this.type == HLL_DATA_EXPLICIT; + registers = new byte[HLL_REGISTERS_COUNT]; + for (Long value : hashSet) { + updateRegisters(value); + } + hashSet.clear(); + } + + private void updateRegisters(long hashValue) { + int idx; + // hash value less than zero means we get a unsigned long + // so need to transfer to BigInter to mod + if (hashValue < 0) { + BigInteger unint64HashValue = new BigInteger(Long.toUnsignedString(hashValue)); + unint64HashValue = unint64HashValue.mod(new BigInteger(Long.toUnsignedString(HLL_REGISTERS_COUNT))); + idx = unint64HashValue.intValue(); + } else { + idx = (int) (hashValue % HLL_REGISTERS_COUNT); + } + + hashValue >>>= HLL_COLUMN_PRECISION; + hashValue |= (1l << HLL_ZERO_COUNT_BITS); + byte firstOneBit = (byte) (getLongTailZeroNum(hashValue) + 1); + registers[idx] = registers[idx] > firstOneBit ? registers[idx] : firstOneBit ; + } + + private void mergeRegisters(byte[] other) { + for (int i = 0; i < HLL_REGISTERS_COUNT; i++) { + this.registers[i] = this.registers[i] > other[i] ? this.registers[i] : other[i]; + } + } + + public static byte getLongTailZeroNum(long hashValue) { + if (hashValue == 0) { + return 0; + } + long value = 1l; + byte idx = 0; + for (;; idx++) { + if ((value & hashValue) != 0) { + return idx; + } + value = value << 1; + if (idx == 62) { + break; + } + } + return idx; + } + + public void updateWithHash(Object value) { + byte[] v = StringUtils.getBytesUtf8(String.valueOf(value)); + update(hash64(v, v.length, SEED)); + } + + public void update(long hashValue) { + switch (this.type) { + case HLL_DATA_EMPTY: + hashSet.add(hashValue); + type = HLL_DATA_EXPLICIT; + break; + case HLL_DATA_EXPLICIT: + if (hashSet.size() < HLL_EXPLICLIT_INT64_NUM) { + hashSet.add(hashValue); + break; + } + convertExplicitToRegister(); + type = HLL_DATA_FULL; + case HLL_DATA_SPARSE: + case HLL_DATA_FULL: + updateRegisters(hashValue); + break; + } + } + + public void merge(Hll other) { + if (other.type == HLL_DATA_EMPTY) { + return; + } + switch (this.type) { + case HLL_DATA_EMPTY: + this.type = other.type; + switch (other.type) { + case HLL_DATA_EXPLICIT: + this.hashSet.addAll(other.hashSet); + break; + case HLL_DATA_SPARSE: + case HLL_DATA_FULL: + this.registers = new byte[HLL_REGISTERS_COUNT]; + System.arraycopy(other.registers, 0, this.registers, 0, HLL_REGISTERS_COUNT); + break; + } + break; + case HLL_DATA_EXPLICIT: + switch (other.type) { + case HLL_DATA_EXPLICIT: + this.hashSet.addAll(other.hashSet); + if (this.hashSet.size() > HLL_EXPLICLIT_INT64_NUM) { + convertExplicitToRegister(); + this.type = HLL_DATA_FULL; + } + break; + case HLL_DATA_SPARSE: + case HLL_DATA_FULL: + convertExplicitToRegister(); + mergeRegisters(other.registers); + this.type = HLL_DATA_FULL; + break; + } + break; + case HLL_DATA_SPARSE: + case HLL_DATA_FULL: + switch (other.type) { + case HLL_DATA_EXPLICIT: + for (long value : other.hashSet) { + update(value); + } + break; + case HLL_DATA_SPARSE: + case HLL_DATA_FULL: + mergeRegisters(other.registers); + break; + } + break; + } + } + + public void serialize(DataOutput output) throws IOException { + switch (type) { + case HLL_DATA_EMPTY: + output.writeByte(type); + break; + case HLL_DATA_EXPLICIT: + output.writeByte(type); + output.writeByte(hashSet.size()); + for (long value : hashSet) { + output.writeLong(Long.reverseBytes(value)); + } + break; + case HLL_DATA_SPARSE: + case HLL_DATA_FULL: + int nonZeroRegisterNum = 0; + for (int i = 0; i < HLL_REGISTERS_COUNT; i++) { + if (registers[i] != 0) { + nonZeroRegisterNum++; + } + } + if (nonZeroRegisterNum > HLL_SPARSE_THRESHOLD) { + output.writeByte(HLL_DATA_FULL); + for (byte value : registers) { + output.writeByte(value); + } + } else { + output.writeByte(HLL_DATA_SPARSE); + output.writeInt(Integer.reverseBytes(nonZeroRegisterNum)); + for (int i = 0; i < HLL_REGISTERS_COUNT; i++) { + if (registers[i] != 0) { + output.writeShort(Short.reverseBytes((short)i)); + output.writeByte(registers[i]); + } + } + } + break; + } + } + + public boolean deserialize(DataInput input) throws IOException { + assert type == HLL_DATA_EMPTY; + + if (input == null) { + return false; + } + + this.type = input.readByte(); + switch (this.type) { + case HLL_DATA_EMPTY: + break; + case HLL_DATA_EXPLICIT: + int hashSetSize = input.readUnsignedByte(); + for (int i = 0; i < hashSetSize; i++) { + update(Long.reverseBytes(input.readLong())); + } + assert this.type == HLL_DATA_EXPLICIT; + break; + case HLL_DATA_SPARSE: + int sparseDataSize = Integer.reverseBytes(input.readInt()); + this.registers = new byte[HLL_REGISTERS_COUNT]; + for (int i = 0; i < sparseDataSize; i++) { + int idx = Short.reverseBytes(input.readShort()); + byte value = input.readByte(); + registers[idx] = value; + } + break; + case HLL_DATA_FULL: + this.registers = new byte[HLL_REGISTERS_COUNT]; + for (int i = 0; i < HLL_REGISTERS_COUNT; i++) { + registers[i] = input.readByte(); + } + break; + default: + return false; + } + + return true; + } + + // use strictfp to force java follow IEEE 754 to deal float point strictly + public strictfp long estimateCardinality() { + if (type == HLL_DATA_EMPTY) { + return 0; + } + if (type == HLL_DATA_EXPLICIT) { + return hashSet.size(); + } + + int numStreams = HLL_REGISTERS_COUNT; + float alpha = 0; + + if (numStreams == 16) { + alpha = 0.673f; + } else if (numStreams == 32) { + alpha = 0.697f; + } else if (numStreams == 64) { + alpha = 0.709f; + } else { + alpha = 0.7213f / (1 + 1.079f / numStreams); + } + + float harmonicMean = 0; + int numZeroRegisters = 0; + + for (int i = 0; i < HLL_REGISTERS_COUNT; i++) { + harmonicMean += Math.pow(2.0f, -registers[i]); + + if (registers[i] == 0) { + numZeroRegisters++; + } + } + + harmonicMean = 1.0f / harmonicMean; + double estimate = alpha * numStreams * numStreams * harmonicMean; + + if (estimate <= numStreams * 2.5 && numZeroRegisters != 0) { + estimate = numStreams * Math.log(((float)numStreams) / ((float)numZeroRegisters)); + } else if (numStreams == 16384 && estimate < 72000) { + double bias = 5.9119 * 1.0e-18 * (estimate * estimate * estimate * estimate) + - 1.4253 * 1.0e-12 * (estimate * estimate * estimate) + + 1.2940 * 1.0e-7 * (estimate * estimate) + - 5.2921 * 1.0e-3 * estimate + + 83.3216; + estimate -= estimate * (bias / 100); + } + + return (long)(estimate + 0.5); + } + + public int maxSerializedSize () { + switch (type) { + case HLL_DATA_EMPTY: + default: + return 1; + case HLL_DATA_EXPLICIT: + return 2 + hashSet.size() * 8; + case HLL_DATA_SPARSE: + case HLL_DATA_FULL: + return 1 + HLL_REGISTERS_COUNT; + } + } + + public static final long M64 = 0xc6a4a7935bd1e995L; + public static final int R64 = 47; + public static final int SEED = 0xadc83b19; + + + private static long getLittleEndianLong(final byte[] data, final int index) { + return (((long) data[index ] & 0xff) ) | + (((long) data[index + 1] & 0xff) << 8) | + (((long) data[index + 2] & 0xff) << 16) | + (((long) data[index + 3] & 0xff) << 24) | + (((long) data[index + 4] & 0xff) << 32) | + (((long) data[index + 5] & 0xff) << 40) | + (((long) data[index + 6] & 0xff) << 48) | + (((long) data[index + 7] & 0xff) << 56); + } + + public static long hash64(final byte[] data, final int length, final int seed) { + long h = (seed & 0xffffffffL) ^ (length * M64); + final int nblocks = length >> 3; + + // body + for (int i = 0; i < nblocks; i++) { + final int index = (i << 3); + long k = getLittleEndianLong(data, index); + + k *= M64; + k ^= k >>> R64; + k *= M64; + + h ^= k; + h *= M64; + } + + final int index = (nblocks << 3); + switch (length - index) { + case 7: + h ^= ((long) data[index + 6] & 0xff) << 48; + case 6: + h ^= ((long) data[index + 5] & 0xff) << 40; + case 5: + h ^= ((long) data[index + 4] & 0xff) << 32; + case 4: + h ^= ((long) data[index + 3] & 0xff) << 24; + case 3: + h ^= ((long) data[index + 2] & 0xff) << 16; + case 2: + h ^= ((long) data[index + 1] & 0xff) << 8; + case 1: + h ^= ((long) data[index] & 0xff); + h *= M64; + } + + h ^= h >>> R64; + h *= M64; + h ^= h >>> R64; + + return h; + } + + // just for ut + public int getType() { + return type; + } + +} \ No newline at end of file diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/HllTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/HllTest.java new file mode 100644 index 0000000..958e96f --- /dev/null +++ b/fe/src/test/java/org/apache/doris/load/loadv2/HllTest.java @@ -0,0 +1,268 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; + +import static org.apache.doris.load.loadv2.Hll.*; + +public class HllTest { + + @Test + public void testFindFirstNonZeroBitPosition() { + Assert.assertTrue(getLongTailZeroNum(0) == 0); + Assert.assertTrue(getLongTailZeroNum(1) == 0); + Assert.assertTrue(getLongTailZeroNum(1l << 30) == 30); + Assert.assertTrue(getLongTailZeroNum(1l << 62) == 62); + } + + @Test + public void HllBasicTest() throws IOException { + // test empty + Hll emptyHll = new Hll(); + + Assert.assertTrue(emptyHll.getType() == HLL_DATA_EMPTY); + Assert.assertTrue(emptyHll.estimateCardinality() == 0); + + ByteArrayOutputStream emptyOutputStream = new ByteArrayOutputStream(); + DataOutput output = new DataOutputStream(emptyOutputStream); + emptyHll.serialize(output); + DataInputStream emptyInputStream = new DataInputStream(new ByteArrayInputStream(emptyOutputStream.toByteArray())); + Hll deserializedEmptyHll = new Hll(); + deserializedEmptyHll.deserialize(emptyInputStream); + Assert.assertTrue(deserializedEmptyHll.getType() == HLL_DATA_EMPTY); + + // test explicit + Hll explicitHll = new Hll(); + for (int i = 0; i < HLL_EXPLICLIT_INT64_NUM; i++) { + explicitHll.updateWithHash(i); + } + Assert.assertTrue(explicitHll.getType() == HLL_DATA_EXPLICIT); + Assert.assertTrue(explicitHll.estimateCardinality() == HLL_EXPLICLIT_INT64_NUM); + + ByteArrayOutputStream explicitOutputStream = new ByteArrayOutputStream(); + DataOutput explicitOutput = new DataOutputStream(explicitOutputStream); + explicitHll.serialize(explicitOutput); + DataInputStream explicitInputStream = new DataInputStream(new ByteArrayInputStream(explicitOutputStream.toByteArray())); + Hll deserializedExplicitHll = new Hll(); + deserializedExplicitHll.deserialize(explicitInputStream); + Assert.assertTrue(deserializedExplicitHll.getType() == HLL_DATA_EXPLICIT); + + // test sparse + Hll sparseHll = new Hll(); + for (int i = 0; i < HLL_SPARSE_THRESHOLD; i++) { + sparseHll.updateWithHash(i); + } + Assert.assertTrue(sparseHll.getType() == HLL_DATA_FULL); + // 2% error rate + Assert.assertTrue(sparseHll.estimateCardinality() > HLL_SPARSE_THRESHOLD * (1 - 0.02) && + sparseHll.estimateCardinality() < HLL_SPARSE_THRESHOLD * (1 + 0.02)); + + ByteArrayOutputStream sparseOutputStream = new ByteArrayOutputStream(); + DataOutput sparseOutput = new DataOutputStream(sparseOutputStream); + sparseHll.serialize(sparseOutput); + DataInputStream sparseInputStream = new DataInputStream(new ByteArrayInputStream(sparseOutputStream.toByteArray())); + Hll deserializedSparseHll = new Hll(); + deserializedSparseHll.deserialize(sparseInputStream); + Assert.assertTrue(deserializedSparseHll.getType() == HLL_DATA_SPARSE); + Assert.assertTrue(sparseHll.estimateCardinality() == deserializedSparseHll.estimateCardinality()); + + + // test full + Hll fullHll = new Hll(); + for (int i = 1; i <= Short.MAX_VALUE; i++) { + fullHll.updateWithHash(i); + } + Assert.assertTrue(fullHll.getType() == HLL_DATA_FULL); + // the result 32748 is consistent with C++ 's implementation + Assert.assertTrue(fullHll.estimateCardinality() == 32748); + Assert.assertTrue(fullHll.estimateCardinality() > Short.MAX_VALUE * (1 - 0.02) && + fullHll.estimateCardinality() < Short.MAX_VALUE * (1 + 0.02)); + + ByteArrayOutputStream fullHllOutputStream = new ByteArrayOutputStream(); + DataOutput fullHllOutput = new DataOutputStream(fullHllOutputStream); + fullHll.serialize(fullHllOutput); + DataInputStream fullHllInputStream = new DataInputStream(new ByteArrayInputStream(fullHllOutputStream.toByteArray())); + Hll deserializedFullHll = new Hll(); + deserializedFullHll.deserialize(fullHllInputStream); + Assert.assertTrue(deserializedFullHll.getType() == HLL_DATA_FULL); + Assert.assertTrue(deserializedFullHll.estimateCardinality() == fullHll.estimateCardinality()); + + } + + // keep logic same with C++ version + // add additional compare logic with C++ version's estimateValue + @Test + public void testCompareEstimateValueWithBe() throws IOException { + //empty + { + Hll hll = new Hll(); + long estimateValue = hll.estimateCardinality(); + byte[] serializedByte = serializeHll(hll); + hll = deserializeHll(serializedByte); + + Assert.assertTrue(estimateValue == hll.estimateCardinality()); + } + + // explicit [0. 100) + Hll explicitHll = new Hll(); + { + for (int i = 0; i < 100; i++) { + explicitHll.updateWithHash(i); + } + Assert.assertTrue(explicitHll.estimateCardinality() == 100); + // check serialize + byte[] serializeHll = serializeHll(explicitHll); + explicitHll = deserializeHll(serializeHll); + Assert.assertTrue(explicitHll.estimateCardinality() == 100); + + Hll otherHll = new Hll(); + for (int i = 0; i < 100; i++) { + otherHll.updateWithHash(i); + } + explicitHll.merge(otherHll); + // compare with C++ version result + Assert.assertTrue(explicitHll.estimateCardinality() == 100); + } + + // sparse [1024, 2048) + Hll sparseHll = new Hll(); + { + for (int i = 0; i < 1024; i++) { + sparseHll.updateWithHash(i + 1024); + } + + long preValue = sparseHll.estimateCardinality(); + // check serialize + byte[] serializedHll = serializeHll(sparseHll); + Assert.assertTrue(serializedHll.length < HLL_REGISTERS_COUNT + 1); + + sparseHll = deserializeHll(serializedHll); + Assert.assertTrue(sparseHll.estimateCardinality() == preValue); + Assert.assertTrue(sparseHll.getType() == HLL_DATA_SPARSE); + + Hll otherHll = new Hll(); + for (int i = 0; i < 1024; i++) { + otherHll.updateWithHash(i + 1024); + } + sparseHll.updateWithHash(1024); + sparseHll.merge(otherHll); + long cardinality = sparseHll.estimateCardinality(); + Assert.assertTrue(preValue == cardinality); + // 2% error rate + Assert.assertTrue(cardinality > 1000 && cardinality < 1045); + // compare with C++ version result + Assert.assertTrue(cardinality == 1023); + } + + // full [64 * 1024, 128 * 1024) + Hll fullHll = new Hll(); + { + for (int i = 0; i < 64 * 1024; i++) { + fullHll.updateWithHash(64 * 1024 + i); + } + + long preValue = fullHll.estimateCardinality(); + // check serialize + byte[] serializedHll = serializeHll(fullHll); + fullHll = deserializeHll(serializedHll); + Assert.assertTrue(fullHll.estimateCardinality() == preValue); + Assert.assertTrue(serializedHll.length == HLL_REGISTERS_COUNT + 1); + + // 2% error rate + Assert.assertTrue(preValue > 62 * 1024 && preValue < 66 * 1024); + + // compare with C++ version result + Assert.assertTrue(preValue == 66112); + } + + // merge explicit to empty_hll + { + Hll newExplicit = new Hll(); + newExplicit.merge(explicitHll); + Assert.assertTrue(newExplicit.estimateCardinality() == 100); + + // merge another explicit + { + Hll otherHll = new Hll(); + for (int i = 100; i < 200; i++) { + otherHll.updateWithHash(i); + } + // this is converted to full + otherHll.merge(newExplicit); + Assert.assertTrue(otherHll.estimateCardinality() > 190); + // compare with C++ version result + Assert.assertTrue(otherHll.estimateCardinality() == 201); + } + // merge full + { + newExplicit.merge(fullHll); + Assert.assertTrue(newExplicit.estimateCardinality() > fullHll.estimateCardinality()); + // compare with C++ version result + Assert.assertTrue(newExplicit.estimateCardinality() == 66250); + } + } + + // merge sparse into empty + { + Hll newSparseHll = new Hll(); + newSparseHll.merge(sparseHll); + Assert.assertTrue(sparseHll.estimateCardinality() == newSparseHll.estimateCardinality()); + // compare with C++ version result + Assert.assertTrue(newSparseHll.estimateCardinality() == 1023); + + // merge explicit + newSparseHll.merge(explicitHll); + Assert.assertTrue(newSparseHll.estimateCardinality() > sparseHll.estimateCardinality()); + // compare with C++ version result + Assert.assertTrue(newSparseHll.estimateCardinality() == 1123); + + // merge full + newSparseHll.merge(fullHll); + Assert.assertTrue(newSparseHll.estimateCardinality() > fullHll.estimateCardinality()); + // compare with C++ version result + Assert.assertTrue(newSparseHll.estimateCardinality() == 67316); + } + + } + + private byte[] serializeHll(Hll hll) throws IOException { + ByteArrayOutputStream fullHllOutputStream = new ByteArrayOutputStream(); + DataOutput fullHllOutput = new DataOutputStream(fullHllOutputStream); + hll.serialize(fullHllOutput); + return fullHllOutputStream.toByteArray(); + } + + private Hll deserializeHll(byte[] hllBytes) throws IOException { + DataInputStream fullHllInputStream = new DataInputStream(new ByteArrayInputStream(hllBytes)); + Hll hll = new Hll(); + hll.deserialize(fullHllInputStream); + return hll; + } + + + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org