davecromberge commented on code in PR #10288:
URL: https://github.com/apache/pinot/pull/10288#discussion_r1108528024


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregator.java:
##########
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import org.apache.datasketches.theta.SingleItemSketch;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.Union;
+import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public class DistinctCountThetaSketchValueAggregator implements 
ValueAggregator<Object, Sketch> {
+  public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
+
+  @Override
+  public AggregationFunctionType getAggregationType() {
+    return AggregationFunctionType.DISTINCTCOUNTTHETASKETCH;
+  }
+
+  @Override
+  public DataType getAggregatedValueType() {
+    return AGGREGATED_VALUE_TYPE;
+  }
+
+  // This changes a lot similar to the Bitmap aggregator
+  private int _maxByteSize;
+
+  // Utility method to create a theta sketch with one item in it
+  private Sketch singleItemSketch(Object rawValue) {
+    return SingleItemSketch.create(rawValue.hashCode()).compact();
+  }
+
+  // Utility method to merge two sketches
+  private Sketch union(Sketch left, Sketch right) {
+    // TODO: Handle configurable nominal entries for StarTreeBuilder
+    Union u = Union.builder()
+      
.setNominalEntries(CommonConstants.Helix.DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES)
+      .buildUnion();
+    u.update(left);
+    u.update(right);
+    return u.getResult().compact();

Review Comment:
   **suggestion:** You could also construct a shared union builder up front and 
apply the stateless operation here:
   
   ```u.union(left, right)```
   
   This would automatically return a compacted sketch as result.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregator.java:
##########
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import org.apache.datasketches.theta.SingleItemSketch;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.Union;
+import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public class DistinctCountThetaSketchValueAggregator implements 
ValueAggregator<Object, Sketch> {
+  public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
+
+  @Override
+  public AggregationFunctionType getAggregationType() {
+    return AggregationFunctionType.DISTINCTCOUNTTHETASKETCH;
+  }
+
+  @Override
+  public DataType getAggregatedValueType() {
+    return AGGREGATED_VALUE_TYPE;
+  }
+
+  // This changes a lot similar to the Bitmap aggregator
+  private int _maxByteSize;
+
+  // Utility method to create a theta sketch with one item in it
+  private Sketch singleItemSketch(Object rawValue) {
+    return SingleItemSketch.create(rawValue.hashCode()).compact();
+  }
+
+  // Utility method to merge two sketches
+  private Sketch union(Sketch left, Sketch right) {
+    // TODO: Handle configurable nominal entries for StarTreeBuilder
+    Union u = Union.builder()
+      
.setNominalEntries(CommonConstants.Helix.DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES)
+      .buildUnion();
+    u.update(left);
+    u.update(right);
+    return u.getResult().compact();
+  }
+
+  @Override
+  public Sketch getInitialAggregatedValue(Object rawValue) {
+    Sketch initialValue;
+    if (rawValue instanceof byte[]) {
+      byte[] bytes = (byte[]) rawValue;
+      initialValue = deserializeAggregatedValue(bytes);
+      _maxByteSize = Math.max(_maxByteSize, bytes.length);
+    } else {
+      initialValue = singleItemSketch(rawValue);
+      _maxByteSize = Math.max(_maxByteSize, 
initialValue.getCurrentBytes(true));
+    }
+    return initialValue;

Review Comment:
   **question:** if the end user attempts to build this index on a non-theta 
serialised byte column, would it blow up?  Would you say there is a potential 
improvement to be made at a system level in relaying types for these values?



##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregatorTest.java:
##########
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import java.util.stream.IntStream;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.Sketches;
+import org.apache.datasketches.theta.Union;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+public class DistinctCountThetaSketchValueAggregatorTest {
+
+
+    @Test
+    public void initialShouldCreateSingleItemSketch() {
+        DistinctCountThetaSketchValueAggregator agg = new 
DistinctCountThetaSketchValueAggregator();
+        assertEquals(
+                agg.getInitialAggregatedValue("hello world").getEstimate(),
+                1.0
+        );
+    }
+
+    @Test
+    public void initialShouldParseASketch() {
+        UpdateSketch input = Sketches.updateSketchBuilder().build();
+        IntStream.range(0, 1000).forEach(input::update);
+        Sketch result = input.compact();
+        DistinctCountThetaSketchValueAggregator agg = new 
DistinctCountThetaSketchValueAggregator();
+        byte[] bytes = agg.serializeAggregatedValue(result);
+        assertEquals(
+                agg.getInitialAggregatedValue(bytes).getEstimate(),
+                result.getEstimate()
+        );
+
+        // and should update the max size
+        assertEquals(
+                agg.getMaxAggregatedValueByteSize(),
+                result.getCurrentBytes(true)
+        );
+    }
+
+    @Test
+    public void applyAggregatedValueShouldUnion() {
+        UpdateSketch input1 = Sketches.updateSketchBuilder().build();
+        IntStream.range(0, 1000).forEach(input1::update);
+        Sketch result1 = input1.compact();
+        UpdateSketch input2 = Sketches.updateSketchBuilder().build();
+        IntStream.range(0, 1000).forEach(input2::update);
+        Sketch result2 = input1.compact();
+        DistinctCountThetaSketchValueAggregator agg = new 
DistinctCountThetaSketchValueAggregator();
+        Sketch result = agg.applyAggregatedValue(result1, result2);
+        Union union = Union.builder()
+          
.setNominalEntries(CommonConstants.Helix.DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES).buildUnion();
+        union.update(result1);
+        union.update(result2);
+
+        assertEquals(
+                result.getEstimate(),
+                union.getResult().getEstimate()
+        );
+
+        // and should update the max size
+        assertEquals(
+                agg.getMaxAggregatedValueByteSize(),
+                union.getResult().getCurrentBytes(true)
+        );
+    }
+
+    @Test
+    public void applyRawValueShouldUnion() {
+        UpdateSketch input1 = Sketches.updateSketchBuilder().build();
+        IntStream.range(0, 1000).forEach(input1::update);
+        Sketch result1 = input1.compact();
+        UpdateSketch input2 = Sketches.updateSketchBuilder().build();
+        IntStream.range(0, 1000).forEach(input2::update);
+        Sketch result2 = input1.compact();

Review Comment:
   **question:** did you mean `input2.compact()`?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregator.java:
##########
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import org.apache.datasketches.theta.SingleItemSketch;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.Union;
+import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public class DistinctCountThetaSketchValueAggregator implements 
ValueAggregator<Object, Sketch> {
+  public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
+
+  @Override
+  public AggregationFunctionType getAggregationType() {
+    return AggregationFunctionType.DISTINCTCOUNTTHETASKETCH;
+  }
+
+  @Override
+  public DataType getAggregatedValueType() {
+    return AGGREGATED_VALUE_TYPE;
+  }
+
+  // This changes a lot similar to the Bitmap aggregator
+  private int _maxByteSize;
+
+  // Utility method to create a theta sketch with one item in it
+  private Sketch singleItemSketch(Object rawValue) {
+    return SingleItemSketch.create(rawValue.hashCode()).compact();

Review Comment:
   **nitpick:** there is no need to compact here because the single item sketch 
is already a compact sketch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to