Jackie-Jiang commented on a change in pull request #5451: URL: https://github.com/apache/incubator-pinot/pull/5451#discussion_r432774276
########## File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/customobject/DistinctTable.java ########## @@ -0,0 +1,297 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function.customobject; + +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; +import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.pinot.common.request.SelectionSort; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataTable; +import org.apache.pinot.core.common.datatable.DataTableBuilder; +import org.apache.pinot.core.common.datatable.DataTableFactory; +import org.apache.pinot.core.data.table.Record; +import org.apache.pinot.spi.utils.ByteArray; + + +/** + * The {@code DistinctTable} class serves as the intermediate result of {@code DistinctAggregationFunction}. + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class DistinctTable { + private static final int MAX_INITIAL_CAPACITY = 10000; + + private final DataSchema _dataSchema; + private final int _limit; + private final Set<Record> _uniqueRecords; + private final PriorityQueue<Record> _sortedRecords; + private final List<Record> _records; + + /** + * Constructor of the main {@code DistinctTable} which can be used to add records and merge other + * {@code DistinctTable}s. + */ + public DistinctTable(DataSchema dataSchema, @Nullable List<SelectionSort> orderBy, int limit) { + _dataSchema = dataSchema; + _limit = limit; + + // TODO: see if 10k is the right max initial capacity to use + // NOTE: When LIMIT is smaller than or equal to the MAX_INITIAL_CAPACITY, no resize is required. + int initialCapacity = Math.min(limit, MAX_INITIAL_CAPACITY); + _uniqueRecords = new ObjectOpenHashSet<>(initialCapacity); + if (orderBy != null) { + String[] columns = dataSchema.getColumnNames(); + int numColumns = columns.length; + Object2IntOpenHashMap<String> columnIndexMap = new Object2IntOpenHashMap<>(numColumns); + for (int i = 0; i < numColumns; i++) { + columnIndexMap.put(columns[i], i); + } + int numOrderByColumns = orderBy.size(); + int[] orderByColumnIndexes = new int[numOrderByColumns]; + boolean[] orderByAsc = new boolean[numOrderByColumns]; + for (int i = 0; i < numOrderByColumns; i++) { + SelectionSort selectionSort = orderBy.get(i); + orderByColumnIndexes[i] = columnIndexMap.getInt(selectionSort.getColumn()); + orderByAsc[i] = selectionSort.isIsAsc(); + } + _sortedRecords = new PriorityQueue<>(initialCapacity, (record1, record2) -> { + Object[] values1 = record1.getValues(); + Object[] values2 = record2.getValues(); + for (int i = 0; i < numOrderByColumns; i++) { + Comparable valueToCompare1 = (Comparable) values1[orderByColumnIndexes[i]]; + Comparable valueToCompare2 = (Comparable) values2[orderByColumnIndexes[i]]; + int result = + orderByAsc[i] ? valueToCompare2.compareTo(valueToCompare1) : valueToCompare1.compareTo(valueToCompare2); + if (result != 0) { + return result; + } + } + return 0; + }); + } else { + _sortedRecords = null; + } + _records = null; + } + + /** + * Returns the {@code DataSchema} of the {@code DistinctTable}. + */ + public DataSchema getDataSchema() { + return _dataSchema; + } + + /** + * Returns the number of unique records within the {@code DistinctTable}. + */ + public int size() { + if (_uniqueRecords != null) { + // Server-side + return _uniqueRecords.size(); + } else { + // Broker-side + return _records.size(); + } + } + + /** + * Adds a record into the DistinctTable and returns whether more records should be added. No more records should be + * added iff there is no order-by column and enough unique records have been collected. + */ + public boolean add(Record record) { + if (_uniqueRecords.contains(record)) { Review comment: For non-order-by, yes we can save this extra contains check. For order-by, we should still first check if it contains the record to avoid the unnecessary remove. More importantly, blindly adding record can result in unexpected expanding of the map. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org