This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 9b49bca554 Add Theta Sketch Aggregation for StarTree Indexes (#10288) 9b49bca554 is described below commit 9b49bca55476b431171f4face5f28e6aeef9fe7f Author: Andi Miller <a...@andimiller.net> AuthorDate: Thu Feb 23 19:34:38 2023 +0000 Add Theta Sketch Aggregation for StarTree Indexes (#10288) --- .../v2/DistinctCountThetaSketchStarTreeV2Test.java | 51 ++++++ .../DistinctCountThetaSketchValueAggregator.java | 178 +++++++++++++++++++++ .../local/aggregator/ValueAggregatorFactory.java | 6 + ...istinctCountThetaSketchValueAggregatorTest.java | 165 +++++++++++++++++++ .../apache/pinot/spi/utils/CommonConstants.java | 20 ++- 5 files changed, 412 insertions(+), 8 deletions(-) diff --git a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountThetaSketchStarTreeV2Test.java b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountThetaSketchStarTreeV2Test.java new file mode 100644 index 0000000000..4e924c9d0c --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountThetaSketchStarTreeV2Test.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.startree.v2; + +import java.util.Random; +import org.apache.datasketches.theta.Sketch; +import org.apache.pinot.segment.local.aggregator.DistinctCountThetaSketchValueAggregator; +import org.apache.pinot.segment.local.aggregator.ValueAggregator; +import org.apache.pinot.spi.data.FieldSpec.DataType; + +import static org.testng.Assert.assertEquals; + + +public class DistinctCountThetaSketchStarTreeV2Test extends BaseStarTreeV2Test<Object, Sketch> { + + @Override + ValueAggregator<Object, Sketch> getValueAggregator() { + return new DistinctCountThetaSketchValueAggregator(); + } + + @Override + DataType getRawValueType() { + return DataType.INT; + } + + @Override + Object getRandomRawValue(Random random) { + return random.nextInt(100); + } + + @Override + void assertAggregatedValue(Sketch starTreeResult, Sketch nonStarTreeResult) { + assertEquals(starTreeResult.getEstimate(), nonStarTreeResult.getEstimate()); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregator.java new file mode 100644 index 0000000000..6acba3ed05 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregator.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.aggregator; + +import java.util.Arrays; +import java.util.stream.StreamSupport; +import org.apache.datasketches.theta.Sketch; +import org.apache.datasketches.theta.Sketches; +import org.apache.datasketches.theta.Union; +import org.apache.datasketches.theta.UpdateSketch; +import org.apache.pinot.segment.local.utils.CustomSerDeUtils; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.utils.CommonConstants; + + +public class DistinctCountThetaSketchValueAggregator implements ValueAggregator<Object, Sketch> { + public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES; + + private final Union _union; + + // This changes a lot similar to the Bitmap aggregator + private int _maxByteSize; + + public DistinctCountThetaSketchValueAggregator() { + // TODO: Handle configurable nominal entries for StarTreeBuilder + _union = Union.builder().setNominalEntries(CommonConstants.Helix.DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES).buildUnion(); + } + + @Override + public AggregationFunctionType getAggregationType() { + return AggregationFunctionType.DISTINCTCOUNTTHETASKETCH; + } + + @Override + public DataType getAggregatedValueType() { + return AGGREGATED_VALUE_TYPE; + } + + // Utility method to create a theta sketch with one item in it + private Sketch singleItemSketch(Object rawValue) { + // TODO: Handle configurable nominal entries for StarTreeBuilder + UpdateSketch sketch = + Sketches.updateSketchBuilder().setNominalEntries(CommonConstants.Helix.DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES) + .build(); + if (rawValue instanceof String) { + sketch.update((String) rawValue); + } else if (rawValue instanceof Integer) { + sketch.update((Integer) rawValue); + } else if (rawValue instanceof Long) { + sketch.update((Long) rawValue); + } else if (rawValue instanceof Double) { + sketch.update((Double) rawValue); + } else if (rawValue instanceof Float) { + sketch.update((Float) rawValue); + } else if (rawValue instanceof Object[]) { + addObjectsToSketch((Object[]) rawValue, sketch); + } else { + throw new IllegalStateException( + "Unsupported data type for Theta Sketch aggregation: " + rawValue.getClass().getName()); + } + return sketch.compact(); + } + + private void addObjectsToSketch(Object[] rawValues, UpdateSketch updateSketch) { + if (rawValues instanceof String[]) { + for (String s : (String[]) rawValues) { + updateSketch.update(s); + } + } else if (rawValues instanceof Integer[]) { + for (Integer i : (Integer[]) rawValues) { + updateSketch.update(i); + } + } else if (rawValues instanceof Long[]) { + for (Long l : (Long[]) rawValues) { + updateSketch.update(l); + } + } else if (rawValues instanceof Double[]) { + for (Double d : (Double[]) rawValues) { + updateSketch.update(d); + } + } else if (rawValues instanceof Float[]) { + for (Float f : (Float[]) rawValues) { + updateSketch.update(f); + } + } else { + throw new IllegalStateException( + "Unsupported data type for Theta Sketch aggregation: " + rawValues.getClass().getName()); + } + } + + // Utility method to merge two sketches + private Sketch union(Sketch left, Sketch right) { + return _union.union(left, right); + } + + // Utility method to make an empty sketch + private Sketch empty() { + // TODO: Handle configurable nominal entries for StarTreeBuilder + return Sketches.updateSketchBuilder().setNominalEntries(CommonConstants.Helix.DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES) + .build().compact(); + } + + @Override + public Sketch getInitialAggregatedValue(Object rawValue) { + Sketch initialValue; + if (rawValue instanceof byte[]) { // Serialized Sketch + byte[] bytes = (byte[]) rawValue; + initialValue = deserializeAggregatedValue(bytes); + _maxByteSize = Math.max(_maxByteSize, bytes.length); + } else if (rawValue instanceof byte[][]) { // Multiple Serialized Sketches + byte[][] serializedSketches = (byte[][]) rawValue; + initialValue = StreamSupport.stream(Arrays.stream(serializedSketches).spliterator(), false) + .map(this::deserializeAggregatedValue).reduce(this::union).orElseGet(this::empty); + _maxByteSize = Math.max(_maxByteSize, initialValue.getCurrentBytes()); + } else { + initialValue = singleItemSketch(rawValue); + _maxByteSize = Math.max(_maxByteSize, initialValue.getCurrentBytes()); + } + return initialValue; + } + + @Override + public Sketch applyRawValue(Sketch value, Object rawValue) { + Sketch right; + if (rawValue instanceof byte[]) { + right = deserializeAggregatedValue((byte[]) rawValue); + } else { + right = singleItemSketch(rawValue); + } + Sketch result = union(value, right).compact(); + _maxByteSize = Math.max(_maxByteSize, result.getCurrentBytes()); + return result; + } + + @Override + public Sketch applyAggregatedValue(Sketch value, Sketch aggregatedValue) { + Sketch result = union(value, aggregatedValue); + _maxByteSize = Math.max(_maxByteSize, result.getCurrentBytes()); + return result; + } + + @Override + public Sketch cloneAggregatedValue(Sketch value) { + return deserializeAggregatedValue(serializeAggregatedValue(value)); + } + + @Override + public int getMaxAggregatedValueByteSize() { + return _maxByteSize; + } + + @Override + public byte[] serializeAggregatedValue(Sketch value) { + return CustomSerDeUtils.DATA_SKETCH_SER_DE.serialize(value); + } + + @Override + public Sketch deserializeAggregatedValue(byte[] bytes) { + return CustomSerDeUtils.DATA_SKETCH_SER_DE.deserialize(bytes); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java index 092a285804..aa4bdb410b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java @@ -63,6 +63,9 @@ public class ValueAggregatorFactory { case PERCENTILETDIGEST: case PERCENTILERAWTDIGEST: return new PercentileTDigestValueAggregator(); + case DISTINCTCOUNTTHETASKETCH: + case DISTINCTCOUNTRAWTHETASKETCH: + return new DistinctCountThetaSketchValueAggregator(); default: throw new IllegalStateException("Unsupported aggregation type: " + aggregationType); } @@ -101,6 +104,9 @@ public class ValueAggregatorFactory { case PERCENTILETDIGEST: case PERCENTILERAWTDIGEST: return PercentileTDigestValueAggregator.AGGREGATED_VALUE_TYPE; + case DISTINCTCOUNTTHETASKETCH: + case DISTINCTCOUNTRAWTHETASKETCH: + return DistinctCountThetaSketchValueAggregator.AGGREGATED_VALUE_TYPE; default: throw new IllegalStateException("Unsupported aggregation type: " + aggregationType); } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregatorTest.java new file mode 100644 index 0000000000..822335cfb0 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregatorTest.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.aggregator; + +import java.util.stream.IntStream; +import org.apache.datasketches.theta.Sketch; +import org.apache.datasketches.theta.Sketches; +import org.apache.datasketches.theta.Union; +import org.apache.datasketches.theta.UpdateSketch; +import org.apache.pinot.spi.utils.CommonConstants; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertThrows; + + +public class DistinctCountThetaSketchValueAggregatorTest { + + @Test + public void initialShouldCreateSingleItemSketch() { + DistinctCountThetaSketchValueAggregator agg = new DistinctCountThetaSketchValueAggregator(); + assertEquals(agg.getInitialAggregatedValue("hello world").getEstimate(), 1.0); + } + + @Test + public void initialShouldParseASketch() { + UpdateSketch input = Sketches.updateSketchBuilder().build(); + IntStream.range(0, 1000).forEach(input::update); + Sketch result = input.compact(); + DistinctCountThetaSketchValueAggregator agg = new DistinctCountThetaSketchValueAggregator(); + byte[] bytes = agg.serializeAggregatedValue(result); + assertEquals(agg.getInitialAggregatedValue(bytes).getEstimate(), result.getEstimate()); + + // and should update the max size + assertEquals(agg.getMaxAggregatedValueByteSize(), result.getCurrentBytes()); + } + + @Test + public void initialShouldParseMultiValueSketches() { + UpdateSketch input1 = Sketches.updateSketchBuilder().build(); + input1.update("hello"); + UpdateSketch input2 = Sketches.updateSketchBuilder().build(); + input2.update("world"); + DistinctCountThetaSketchValueAggregator agg = new DistinctCountThetaSketchValueAggregator(); + byte[][] bytes = {agg.serializeAggregatedValue(input1), agg.serializeAggregatedValue(input2)}; + assertEquals(agg.getInitialAggregatedValue(bytes).getEstimate(), 2.0); + } + + @Test + public void applyAggregatedValueShouldUnion() { + UpdateSketch input1 = Sketches.updateSketchBuilder().build(); + IntStream.range(0, 1000).forEach(input1::update); + Sketch result1 = input1.compact(); + UpdateSketch input2 = Sketches.updateSketchBuilder().build(); + IntStream.range(0, 1000).forEach(input2::update); + Sketch result2 = input2.compact(); + DistinctCountThetaSketchValueAggregator agg = new DistinctCountThetaSketchValueAggregator(); + Sketch result = agg.applyAggregatedValue(result1, result2); + Union union = + Union.builder().setNominalEntries(CommonConstants.Helix.DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES).buildUnion(); + + Sketch merged = union.union(result1, result2); + + assertEquals(result.getEstimate(), merged.getEstimate()); + + // and should update the max size + assertEquals(agg.getMaxAggregatedValueByteSize(), merged.getCurrentBytes()); + } + + @Test + public void applyRawValueShouldUnion() { + UpdateSketch input1 = Sketches.updateSketchBuilder().build(); + IntStream.range(0, 1000).forEach(input1::update); + Sketch result1 = input1.compact(); + UpdateSketch input2 = Sketches.updateSketchBuilder().build(); + IntStream.range(0, 1000).forEach(input2::update); + Sketch result2 = input2.compact(); + DistinctCountThetaSketchValueAggregator agg = new DistinctCountThetaSketchValueAggregator(); + byte[] result2bytes = agg.serializeAggregatedValue(result2); + Sketch result = agg.applyRawValue(result1, result2bytes); + Union union = + Union.builder().setNominalEntries(CommonConstants.Helix.DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES).buildUnion(); + + Sketch merged = union.union(result1, result2); + + assertEquals(result.getEstimate(), merged.getEstimate()); + + // and should update the max size + assertEquals(agg.getMaxAggregatedValueByteSize(), merged.getCurrentBytes()); + } + + @Test + public void applyRawValueShouldAdd() { + UpdateSketch input1 = Sketches.updateSketchBuilder().build(); + input1.update("hello".hashCode()); + Sketch result1 = input1.compact(); + DistinctCountThetaSketchValueAggregator agg = new DistinctCountThetaSketchValueAggregator(); + Sketch result = agg.applyRawValue(result1, "world"); + + assertEquals(result.getEstimate(), 2.0); + + // and should update the max size + assertEquals(agg.getMaxAggregatedValueByteSize(), 32 // may change in future versions of datasketches + ); + } + + @Test + public void applyRawValueShouldSupportMultiValue() { + UpdateSketch input1 = Sketches.updateSketchBuilder().build(); + input1.update("hello"); + Sketch result1 = input1.compact(); + DistinctCountThetaSketchValueAggregator agg = new DistinctCountThetaSketchValueAggregator(); + String[] strings = {"hello", "world", "this", "is", "some", "strings"}; + Sketch result = agg.applyRawValue(result1, (Object) strings); + + assertEquals(result.getEstimate(), 6.0); + + // and should update the max size + assertEquals(agg.getMaxAggregatedValueByteSize(), 64 // may change in future versions of datasketches + ); + } + + @Test + public void getInitialValueShouldSupportDifferentTypes() { + DistinctCountThetaSketchValueAggregator agg = new DistinctCountThetaSketchValueAggregator(); + assertEquals(agg.getInitialAggregatedValue(12345).getEstimate(), 1.0); + assertEquals(agg.getInitialAggregatedValue(12345L).getEstimate(), 1.0); + assertEquals(agg.getInitialAggregatedValue(12.345f).getEstimate(), 1.0); + assertEquals(agg.getInitialAggregatedValue(12.345d).getEstimate(), 1.0); + assertThrows(() -> agg.getInitialAggregatedValue(new Object())); + } + + @Test + public void getInitialValueShouldSupportMultiValueTypes() { + DistinctCountThetaSketchValueAggregator agg = new DistinctCountThetaSketchValueAggregator(); + Integer[] ints = {12345}; + assertEquals(agg.getInitialAggregatedValue(ints).getEstimate(), 1.0); + Long[] longs = {12345L}; + assertEquals(agg.getInitialAggregatedValue(longs).getEstimate(), 1.0); + Float[] floats = {12.345f}; + assertEquals(agg.getInitialAggregatedValue(floats).getEstimate(), 1.0); + Double[] doubles = {12.345d}; + assertEquals(agg.getInitialAggregatedValue(doubles).getEstimate(), 1.0); + Object[] objects = {new Object()}; + assertThrows(() -> agg.getInitialAggregatedValue(objects)); + byte[][] zeroSketches = {}; + assertEquals(agg.getInitialAggregatedValue(zeroSketches).getEstimate(), 0.0); + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 3a821c3b2b..2f0dedcb5b 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -46,6 +46,7 @@ public class CommonConstants { "org.apache.pinot.plugin.metrics.yammer.YammerMetricsFactory"; public static final String SWAGGER_AUTHORIZATION_KEY = "oauth"; + /** * The state of the consumer for a given segment */ @@ -91,6 +92,10 @@ public class CommonConstants { public static final String DEFAULT_HYPERLOGLOG_LOG2M_KEY = "default.hyperloglog.log2m"; public static final int DEFAULT_HYPERLOGLOG_LOG2M = 8; + // 2 to the power of 16, for tradeoffs see datasketches library documentation: + // https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html + public static final int DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES = 65536; + // Whether to rewrite DistinctCount to DistinctCountBitmap public static final String ENABLE_DISTINCT_COUNT_BITMAP_OVERRIDE_KEY = "enable.distinct.count.bitmap.override"; @@ -262,8 +267,7 @@ public class CommonConstants { public static final String CONTROLLER_URL = "pinot.broker.controller.url"; - public static final String CONFIG_OF_BROKER_REQUEST_CLIENT_IP_LOGGING = - "pinot.broker.request.client.ip.logging"; + public static final String CONFIG_OF_BROKER_REQUEST_CLIENT_IP_LOGGING = "pinot.broker.request.client.ip.logging"; // TODO: Support populating clientIp for GrpcRequestIdentity. public static final boolean DEFAULT_BROKER_REQUEST_CLIENT_IP_LOGGING = false; @@ -271,10 +275,10 @@ public class CommonConstants { public static final String CONFIG_OF_LOGGER_ROOT_DIR = "pinot.broker.logger.root.dir"; public static final String CONFIG_OF_SWAGGER_BROKER_ENABLED = "pinot.broker.swagger.enabled"; public static final boolean DEFAULT_SWAGGER_BROKER_ENABLED = true; - public static final String CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT - = "pinot.broker.instance.enableThreadCpuTimeMeasurement"; - public static final String CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT - = "pinot.broker.instance.enableThreadAllocatedBytesMeasurement"; + public static final String CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT = + "pinot.broker.instance.enableThreadCpuTimeMeasurement"; + public static final String CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT = + "pinot.broker.instance.enableThreadAllocatedBytesMeasurement"; public static final boolean DEFAULT_ENABLE_THREAD_CPU_TIME_MEASUREMENT = false; public static final boolean DEFAULT_THREAD_ALLOCATED_BYTES_MEASUREMENT = false; @@ -772,8 +776,8 @@ public class CommonConstants { public static final String CONFIG_OF_SLEEP_TIME_DENOMINATOR = "accounting.sleep.time.denominator"; public static final int DEFAULT_SLEEP_TIME_DENOMINATOR = 3; - public static final String CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO - = "accounting.min.memory.footprint.to.kill.ratio"; + public static final String CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO = + "accounting.min.memory.footprint.to.kill.ratio"; public static final double DEFAULT_MEMORY_FOOTPRINT_TO_KILL_RATIO = 0.025; public static final String CONFIG_OF_GC_BACKOFF_COUNT = "accounting.gc.backoff.count"; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org