This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 73d2839 Merge common APIs for Dictionary (#6176) 73d2839 is described below commit 73d2839ac1cd094072c3c031008d6335527c29da Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Thu Oct 22 18:42:37 2020 -0700 Merge common APIs for Dictionary (#6176) Motivation: Currently the APIs for Dictionary is split in 3 places: `Dictionary`, `BaseImmutableDictionary`, `BaseMutableDictionary`. In order to use them, we need to cast the dictionary first, which is hard to manage and can potentially cause casting error. E.g. #6174 is caused by casting an immutable dictionary to `BaseMutableDictionary`. We should move the common read APIs to the root `Dictionary` interface to avoid the casting, and let all types of dictionary support these APIs. Merge the following common APIs from `BaseImmutableDictionary` and `BaseMutableDictionary` to `Dictionary`: - `insertionIndexOf` - `getDictIdsInRange` - `compare` - `getMinVal` - `getMaxVal` - `getSortedValues` --- .../indexsegment/mutable/MutableSegmentImpl.java | 14 +-- .../core/operator/filter/FilterOperatorUtils.java | 4 +- .../filter/RangeIndexBasedFilterOperator.java | 8 +- .../filter/SortedIndexBasedFilterOperator.java | 10 +- .../predicate/RangePredicateEvaluatorFactory.java | 19 ++-- .../converter/stats/RealtimeColumnStatistics.java | 33 ++++--- .../impl/dictionary/BaseMutableDictionary.java | 72 --------------- .../dictionary/BaseOffHeapMutableDictionary.java | 3 +- .../dictionary/BaseOnHeapMutableDictionary.java | 3 +- .../impl/dictionary/MutableDictionaryFactory.java | 5 +- .../core/segment/index/readers/BaseDictionary.java | 72 --------------- .../index/readers/BaseImmutableDictionary.java | 43 +++++++-- .../segment/index/readers/BytesDictionary.java | 15 ++- .../readers/ConstantValueBytesDictionary.java | 19 +++- .../readers/ConstantValueDoubleDictionary.java | 19 +++- .../readers/ConstantValueFloatDictionary.java | 19 +++- .../index/readers/ConstantValueIntDictionary.java | 19 +++- .../index/readers/ConstantValueLongDictionary.java | 19 +++- .../readers/ConstantValueStringDictionary.java | 19 +++- .../core/segment/index/readers/Dictionary.java | 102 ++++++++++++++++++--- .../segment/index/readers/MutableDictionary.java | 48 ++++++++++ ...ngeOfflineDictionaryPredicateEvaluatorTest.java | 53 +++++------ .../impl/dictionary/MutableDictionaryTest.java | 39 ++++---- 23 files changed, 382 insertions(+), 275 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java index 24c9514..d0f391f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java @@ -42,7 +42,7 @@ import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager; import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig; import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory; import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap; -import org.apache.pinot.core.realtime.impl.dictionary.BaseMutableDictionary; +import org.apache.pinot.core.segment.index.readers.MutableDictionary; import org.apache.pinot.core.realtime.impl.dictionary.BaseOffHeapMutableDictionary; import org.apache.pinot.core.realtime.impl.dictionary.MutableDictionaryFactory; import org.apache.pinot.core.realtime.impl.forward.FixedByteMVMutableForwardIndex; @@ -247,7 +247,7 @@ public class MutableSegmentImpl implements MutableSegment { DataType dataType = fieldSpec.getDataType(); boolean isFixedWidthColumn = dataType.isFixedWidth(); MutableForwardIndex forwardIndex; - BaseMutableDictionary dictionary; + MutableDictionary dictionary; if (isNoDictionaryColumn(noDictionaryColumns, invertedIndexColumns, fieldSpec, column)) { // No dictionary column (always single-valued) assert fieldSpec.isSingleValueField(); @@ -492,7 +492,7 @@ public class MutableSegmentImpl implements MutableSegment { String column = entry.getKey(); IndexContainer indexContainer = entry.getValue(); Object value = row.getValue(column); - BaseMutableDictionary dictionary = indexContainer._dictionary; + MutableDictionary dictionary = indexContainer._dictionary; if (dictionary != null) { if (indexContainer._fieldSpec.isSingleValueField()) { indexContainer._dictId = dictionary.index(value); @@ -744,7 +744,7 @@ public class MutableSegmentImpl implements MutableSegment { * Helper method to read the value for the given document id. */ private static Object getValue(int docId, MutableForwardIndex forwardIndex, - @Nullable BaseMutableDictionary dictionary, int maxNumMultiValues) { + @Nullable MutableDictionary dictionary, int maxNumMultiValues) { if (dictionary != null) { // Dictionary based if (forwardIndex.isSingleValue()) { @@ -856,7 +856,7 @@ public class MutableSegmentImpl implements MutableSegment { */ public int[] getSortedDocIdIterationOrderWithSortedColumn(String column) { IndexContainer indexContainer = _indexContainerMap.get(column); - BaseMutableDictionary dictionary = indexContainer._dictionary; + MutableDictionary dictionary = indexContainer._dictionary; // Sort all values in the dictionary int numValues = dictionary.length(); @@ -1032,7 +1032,7 @@ public class MutableSegmentImpl implements MutableSegment { final Set<Integer> _partitions; final NumValuesInfo _numValuesInfo; final MutableForwardIndex _forwardIndex; - final BaseMutableDictionary _dictionary; + final MutableDictionary _dictionary; final RealtimeInvertedIndexReader _invertedIndex; final InvertedIndexReader _rangeIndex; final RealtimeLuceneTextIndexReader _textIndex; @@ -1048,7 +1048,7 @@ public class MutableSegmentImpl implements MutableSegment { IndexContainer(FieldSpec fieldSpec, @Nullable PartitionFunction partitionFunction, @Nullable Set<Integer> partitions, NumValuesInfo numValuesInfo, MutableForwardIndex forwardIndex, - @Nullable BaseMutableDictionary dictionary, @Nullable RealtimeInvertedIndexReader invertedIndex, + @Nullable MutableDictionary dictionary, @Nullable RealtimeInvertedIndexReader invertedIndex, @Nullable InvertedIndexReader rangeIndex, @Nullable RealtimeLuceneTextIndexReader textIndex, @Nullable BloomFilterReader bloomFilter, @Nullable MutableNullValueVector nullValueVector) { _fieldSpec = fieldSpec; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java index 3bc676e..b497031 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java @@ -26,7 +26,6 @@ import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; import org.apache.pinot.core.common.DataSource; import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; -import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory.OfflineDictionaryBasedRangePredicateEvaluator; import org.apache.pinot.core.query.request.context.predicate.Predicate; @@ -54,8 +53,7 @@ public class FilterOperatorUtils { return new SortedIndexBasedFilterOperator(predicateEvaluator, dataSource, numDocs); } if (dataSource.getRangeIndex() != null) { - return new RangeIndexBasedFilterOperator((OfflineDictionaryBasedRangePredicateEvaluator) predicateEvaluator, - dataSource, numDocs); + return new RangeIndexBasedFilterOperator(predicateEvaluator, dataSource, numDocs); } return new ScanBasedFilterOperator(predicateEvaluator, dataSource, numDocs); } else if (predicateType == Predicate.Type.REGEXP_LIKE) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeIndexBasedFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeIndexBasedFilterOperator.java index d9eee0b..d50c747 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeIndexBasedFilterOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeIndexBasedFilterOperator.java @@ -28,7 +28,7 @@ import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFa import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory.FloatRawValueBasedRangePredicateEvaluator; import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory.IntRawValueBasedRangePredicateEvaluator; import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory.LongRawValueBasedRangePredicateEvaluator; -import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory.OfflineDictionaryBasedRangePredicateEvaluator; +import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory.SortedDictionaryBasedRangePredicateEvaluator; import org.apache.pinot.core.segment.index.readers.RangeIndexReader; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; import org.roaringbitmap.buffer.MutableRoaringBitmap; @@ -56,12 +56,12 @@ public class RangeIndexBasedFilterOperator extends BaseFilterOperator { int firstRangeId; int lastRangeId; - if (_rangePredicateEvaluator instanceof OfflineDictionaryBasedRangePredicateEvaluator) { + if (_rangePredicateEvaluator instanceof SortedDictionaryBasedRangePredicateEvaluator) { firstRangeId = rangeIndexReader - .findRangeId(((OfflineDictionaryBasedRangePredicateEvaluator) _rangePredicateEvaluator).getStartDictId()); + .findRangeId(((SortedDictionaryBasedRangePredicateEvaluator) _rangePredicateEvaluator).getStartDictId()); // NOTE: End dictionary id is exclusive in OfflineDictionaryBasedRangePredicateEvaluator. lastRangeId = rangeIndexReader - .findRangeId(((OfflineDictionaryBasedRangePredicateEvaluator) _rangePredicateEvaluator).getEndDictId() - 1); + .findRangeId(((SortedDictionaryBasedRangePredicateEvaluator) _rangePredicateEvaluator).getEndDictId() - 1); } else { switch (_rangePredicateEvaluator.getDataType()) { case INT: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/SortedIndexBasedFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/SortedIndexBasedFilterOperator.java index 4dc25ee..d66127b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/SortedIndexBasedFilterOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/SortedIndexBasedFilterOperator.java @@ -25,11 +25,11 @@ import java.util.Collections; import java.util.List; import org.apache.pinot.common.utils.Pairs.IntPair; import org.apache.pinot.core.common.DataSource; -import org.apache.pinot.core.segment.index.readers.SortedIndexReader; import org.apache.pinot.core.operator.blocks.FilterBlock; import org.apache.pinot.core.operator.docidsets.SortedDocIdSet; import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; -import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory.OfflineDictionaryBasedRangePredicateEvaluator; +import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory.SortedDictionaryBasedRangePredicateEvaluator; +import org.apache.pinot.core.segment.index.readers.SortedIndexReader; public class SortedIndexBasedFilterOperator extends BaseFilterOperator { @@ -54,10 +54,10 @@ public class SortedIndexBasedFilterOperator extends BaseFilterOperator { // - "Subtractive" operators (NEQ, NOT IN): Build up a list of non-matching docIdRanges with adjacent ones merged, // then subtract them from the range of [0, numDocs) to get a list of matching docIdRanges. - if (_predicateEvaluator instanceof OfflineDictionaryBasedRangePredicateEvaluator) { + if (_predicateEvaluator instanceof SortedDictionaryBasedRangePredicateEvaluator) { // For RANGE predicate, use start/end document id to construct a new document id range - OfflineDictionaryBasedRangePredicateEvaluator rangePredicateEvaluator = - (OfflineDictionaryBasedRangePredicateEvaluator) _predicateEvaluator; + SortedDictionaryBasedRangePredicateEvaluator rangePredicateEvaluator = + (SortedDictionaryBasedRangePredicateEvaluator) _predicateEvaluator; int startDocId = _sortedIndexReader.getDocIds(rangePredicateEvaluator.getStartDictId()).getLeft(); // NOTE: End dictionary id is exclusive in OfflineDictionaryBasedRangePredicateEvaluator. int endDocId = _sortedIndexReader.getDocIds(rangePredicateEvaluator.getEndDictId() - 1).getRight(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/RangePredicateEvaluatorFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/RangePredicateEvaluatorFactory.java index 6ec5aad..0fb1b6c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/RangePredicateEvaluatorFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/RangePredicateEvaluatorFactory.java @@ -21,8 +21,6 @@ package org.apache.pinot.core.operator.filter.predicate; import it.unimi.dsi.fastutil.ints.IntSet; import org.apache.pinot.core.query.request.context.predicate.Predicate; import org.apache.pinot.core.query.request.context.predicate.RangePredicate; -import org.apache.pinot.core.realtime.impl.dictionary.BaseMutableDictionary; -import org.apache.pinot.core.segment.index.readers.BaseImmutableDictionary; import org.apache.pinot.core.segment.index.readers.Dictionary; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.utils.ByteArray; @@ -46,11 +44,10 @@ public class RangePredicateEvaluatorFactory { */ public static BaseDictionaryBasedPredicateEvaluator newDictionaryBasedEvaluator(RangePredicate rangePredicate, Dictionary dictionary, DataType dataType) { - if (dictionary instanceof BaseImmutableDictionary) { - return new OfflineDictionaryBasedRangePredicateEvaluator(rangePredicate, (BaseImmutableDictionary) dictionary); + if (dictionary.isSorted()) { + return new SortedDictionaryBasedRangePredicateEvaluator(rangePredicate, dictionary); } else { - return new RealtimeDictionaryBasedRangePredicateEvaluator(rangePredicate, (BaseMutableDictionary) dictionary, - dataType); + return new UnsortedDictionaryBasedRangePredicateEvaluator(rangePredicate, dictionary, dataType); } } @@ -81,14 +78,14 @@ public class RangePredicateEvaluatorFactory { } } - public static final class OfflineDictionaryBasedRangePredicateEvaluator extends BaseDictionaryBasedPredicateEvaluator { + public static final class SortedDictionaryBasedRangePredicateEvaluator extends BaseDictionaryBasedPredicateEvaluator { final int _startDictId; // Exclusive final int _endDictId; final int _numMatchingDictIds; int[] _matchingDictIds; - OfflineDictionaryBasedRangePredicateEvaluator(RangePredicate rangePredicate, BaseImmutableDictionary dictionary) { + SortedDictionaryBasedRangePredicateEvaluator(RangePredicate rangePredicate, Dictionary dictionary) { String lowerBound = rangePredicate.getLowerBound(); String upperBound = rangePredicate.getUpperBound(); boolean lowerInclusive = rangePredicate.isLowerInclusive(); @@ -170,19 +167,19 @@ public class RangePredicateEvaluatorFactory { } } - private static final class RealtimeDictionaryBasedRangePredicateEvaluator extends BaseDictionaryBasedPredicateEvaluator { + private static final class UnsortedDictionaryBasedRangePredicateEvaluator extends BaseDictionaryBasedPredicateEvaluator { // When the cardinality of the column is lower than this threshold, pre-calculate the matching dictionary ids; // otherwise, fetch the value when evaluating each dictionary id. // TODO: Tune this threshold private static final int DICT_ID_SET_BASED_CARDINALITY_THRESHOLD = 1000; - final BaseMutableDictionary _dictionary; + final Dictionary _dictionary; final DataType _dataType; final boolean _dictIdSetBased; final IntSet _matchingDictIdSet; final BaseRawValueBasedPredicateEvaluator _rawValueBasedEvaluator; - RealtimeDictionaryBasedRangePredicateEvaluator(RangePredicate rangePredicate, BaseMutableDictionary dictionary, + UnsortedDictionaryBasedRangePredicateEvaluator(RangePredicate rangePredicate, Dictionary dictionary, DataType dataType) { _dictionary = dictionary; _dataType = dataType; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeColumnStatistics.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeColumnStatistics.java index 7970f1a..8c4c93a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeColumnStatistics.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeColumnStatistics.java @@ -22,8 +22,8 @@ import java.util.Set; import org.apache.pinot.core.common.DataSource; import org.apache.pinot.core.common.DataSourceMetadata; import org.apache.pinot.core.data.partition.PartitionFunction; -import org.apache.pinot.core.realtime.impl.dictionary.BaseMutableDictionary; import org.apache.pinot.core.segment.creator.ColumnStatistics; +import org.apache.pinot.core.segment.index.readers.Dictionary; import org.apache.pinot.core.segment.index.readers.MutableForwardIndex; import org.apache.pinot.spi.data.FieldSpec; @@ -36,32 +36,35 @@ import org.apache.pinot.spi.data.FieldSpec; public class RealtimeColumnStatistics implements ColumnStatistics { private final DataSource _dataSource; private final int[] _sortedDocIdIterationOrder; - private final BaseMutableDictionary _mutableDictionary; + + // NOTE: For new added columns during the ingestion, this will be constant value dictionary instead of mutable + // dictionary. + private final Dictionary _dictionary; public RealtimeColumnStatistics(DataSource dataSource, int[] sortedDocIdIterationOrder) { _dataSource = dataSource; _sortedDocIdIterationOrder = sortedDocIdIterationOrder; - _mutableDictionary = (BaseMutableDictionary) dataSource.getDictionary(); + _dictionary = dataSource.getDictionary(); } @Override public Object getMinValue() { - return _mutableDictionary.getMinVal(); + return _dictionary.getMinVal(); } @Override public Object getMaxValue() { - return _mutableDictionary.getMaxVal(); + return _dictionary.getMaxVal(); } @Override public Object getUniqueValuesSet() { - return _mutableDictionary.getSortedValues(); + return _dictionary.getSortedValues(); } @Override public int getCardinality() { - return _mutableDictionary.length(); + return _dictionary.length(); } @Override @@ -71,15 +74,15 @@ public class RealtimeColumnStatistics implements ColumnStatistics { // If this column is a string/bytes column, iterate over the dictionary to find the maximum length FieldSpec.DataType dataType = _dataSource.getDataSourceMetadata().getDataType(); - int length = _mutableDictionary.length(); + int length = _dictionary.length(); if (dataType.equals(FieldSpec.DataType.STRING)) { for (int i = 0; i < length; i++) { - minStringLength = Math.min(_mutableDictionary.getStringValue(i).length(), minStringLength); + minStringLength = Math.min(_dictionary.getStringValue(i).length(), minStringLength); } } else if (dataType.equals(FieldSpec.DataType.BYTES)) { for (int i = 0; i < length; i++) { - minStringLength = Math.min(_mutableDictionary.getBytesValue(i).length, minStringLength); + minStringLength = Math.min(_dictionary.getBytesValue(i).length, minStringLength); } } @@ -93,15 +96,15 @@ public class RealtimeColumnStatistics implements ColumnStatistics { // If this column is a string/bytes column, iterate over the dictionary to find the maximum length FieldSpec.DataType dataType = _dataSource.getDataSourceMetadata().getDataType(); - int length = _mutableDictionary.length(); + int length = _dictionary.length(); if (dataType.equals(FieldSpec.DataType.STRING)) { for (int i = 0; i < length; i++) { - maximumStringLength = Math.max(_mutableDictionary.getStringValue(i).length(), maximumStringLength); + maximumStringLength = Math.max(_dictionary.getStringValue(i).length(), maximumStringLength); } } else if (dataType.equals(FieldSpec.DataType.BYTES)) { for (int i = 0; i < length; i++) { - maximumStringLength = Math.max(_mutableDictionary.getBytesValue(i).length, maximumStringLength); + maximumStringLength = Math.max(_dictionary.getBytesValue(i).length, maximumStringLength); } } @@ -130,7 +133,7 @@ public class RealtimeColumnStatistics implements ColumnStatistics { int previousDictId = mutableForwardIndex.getDictId(_sortedDocIdIterationOrder[0]); for (int i = 1; i < numDocs; i++) { int currentDictId = mutableForwardIndex.getDictId(_sortedDocIdIterationOrder[i]); - if (_mutableDictionary.compare(previousDictId, currentDictId) > 0) { + if (_dictionary.compare(previousDictId, currentDictId) > 0) { return false; } previousDictId = currentDictId; @@ -139,7 +142,7 @@ public class RealtimeColumnStatistics implements ColumnStatistics { int previousDictId = mutableForwardIndex.getDictId(0); for (int i = 1; i < numDocs; i++) { int currentDictId = mutableForwardIndex.getDictId(i); - if (_mutableDictionary.compare(previousDictId, currentDictId) > 0) { + if (_dictionary.compare(previousDictId, currentDictId) > 0) { return false; } previousDictId = currentDictId; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/BaseMutableDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/BaseMutableDictionary.java deleted file mode 100644 index 863ff80..0000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/BaseMutableDictionary.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.realtime.impl.dictionary; - -import it.unimi.dsi.fastutil.ints.IntSet; -import org.apache.pinot.core.segment.index.readers.BaseDictionary; - - -public abstract class BaseMutableDictionary extends BaseDictionary { - - public boolean isSorted() { - return false; - } - - /** - * Indexes a single-value entry (a value of the dictionary type) into the dictionary, and returns the dictId of the - * value. - */ - public abstract int index(Object value); - - /** - * Indexes a multi-value entry (an array of values of the dictionary type) into the dictionary, and returns an array - * of dictIds for each value. - */ - public abstract int[] index(Object[] values); - - /** - * Returns the comparison result of value for dictId 1 and dictId 2, i.e. {@code value1.compareTo(value2)}. - */ - public abstract int compare(int dictId1, int dictId2); - - /** - * Returns a set of dictIds in the given value range, where lower/upper bound can be "*" which indicates unbounded - * range. This API is for range predicate evaluation. - */ - public abstract IntSet getDictIdsInRange(String lower, String upper, boolean includeLower, boolean includeUpper); - - /** - * Returns the minimum value in the dictionary. Note that for type BYTES, {@code ByteArray} will be returned. This API - * is for stats collection and will be called after all values are inserted. - */ - public abstract Comparable getMinVal(); - - /** - * Returns the maximum value in the dictionary. Note that for type BYTES, {@code ByteArray} will be returned. This API - * is for stats collection and will be called after all values are inserted. - */ - public abstract Comparable getMaxVal(); - - /** - * Returns an sorted array of all values in the dictionary. For type INT/LONG/FLOAT/DOUBLE, primitive type array will - * be returned; for type STRING, {@code String[]} will be returned; for type BYTES, {@code ByteArray[]} will be - * returned. This API is for stats collection and will be called after all values are inserted. - */ - public abstract Object getSortedValues(); -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/BaseOffHeapMutableDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/BaseOffHeapMutableDictionary.java index 4cf4cbd..e14ebad 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/BaseOffHeapMutableDictionary.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/BaseOffHeapMutableDictionary.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager; +import org.apache.pinot.core.segment.index.readers.MutableDictionary; import org.apache.pinot.core.segment.memory.PinotDataBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -131,7 +132,7 @@ import org.slf4j.LoggerFactory; * - It may be useful to implement a way to stop adding new items when the the number of buffers reaches a certain * threshold. In this case, we could close the realtime segment, and start a new one with bigger buffers. */ -public abstract class BaseOffHeapMutableDictionary extends BaseMutableDictionary { +public abstract class BaseOffHeapMutableDictionary implements MutableDictionary { private static final Logger LOGGER = LoggerFactory.getLogger(BaseOffHeapMutableDictionary.class); // List of primes from http://compoasso.free.fr/primelistweb/page/prime/liste_online_en.php diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/BaseOnHeapMutableDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/BaseOnHeapMutableDictionary.java index d2b55f6..ef0bd01 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/BaseOnHeapMutableDictionary.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/BaseOnHeapMutableDictionary.java @@ -21,6 +21,7 @@ package org.apache.pinot.core.realtime.impl.dictionary; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.pinot.core.segment.index.readers.MutableDictionary; /** @@ -31,7 +32,7 @@ import java.util.concurrent.ConcurrentHashMap; * value later, but not reversely. So whenever we return a valid dictionary id for a value, we need to ensure the value * can be fetched by the dictionary id returned. */ -public abstract class BaseOnHeapMutableDictionary extends BaseMutableDictionary { +public abstract class BaseOnHeapMutableDictionary implements MutableDictionary { private static final int SHIFT_OFFSET = 13; // INITIAL_DICTIONARY_SIZE = 8192 private static final int INITIAL_DICTIONARY_SIZE = 1 << SHIFT_OFFSET; private static final int MASK = 0xFFFFFFFF >>> (Integer.SIZE - SHIFT_OFFSET); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/MutableDictionaryFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/MutableDictionaryFactory.java index b47528a..2880f38 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/MutableDictionaryFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/MutableDictionaryFactory.java @@ -18,15 +18,16 @@ */ package org.apache.pinot.core.realtime.impl.dictionary; -import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager; +import org.apache.pinot.core.segment.index.readers.MutableDictionary; +import org.apache.pinot.spi.data.FieldSpec.DataType; public class MutableDictionaryFactory { private MutableDictionaryFactory() { } - public static BaseMutableDictionary getMutableDictionary(FieldSpec.DataType dataType, boolean isOffHeapAllocation, + public static MutableDictionary getMutableDictionary(DataType dataType, boolean isOffHeapAllocation, PinotDataBufferMemoryManager memoryManager, int avgLength, int cardinality, String allocationContext) { if (isOffHeapAllocation) { // OnHeap allocation diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BaseDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BaseDictionary.java deleted file mode 100644 index 9a0effb..0000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BaseDictionary.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.segment.index.readers; - -public abstract class BaseDictionary implements Dictionary { - - /** - * Should be overridden by dictionary of type STRING and BYTES. - */ - @Override - public byte[] getBytesValue(int dictId) { - throw new UnsupportedOperationException(); - } - - @Override - public void readIntValues(int[] dictIds, int length, int[] outValues) { - for (int i = 0; i < length; i++) { - outValues[i] = getIntValue(dictIds[i]); - } - } - - @Override - public void readLongValues(int[] dictIds, int length, long[] outValues) { - for (int i = 0; i < length; i++) { - outValues[i] = getLongValue(dictIds[i]); - } - } - - @Override - public void readFloatValues(int[] dictIds, int length, float[] outValues) { - for (int i = 0; i < length; i++) { - outValues[i] = getFloatValue(dictIds[i]); - } - } - - @Override - public void readDoubleValues(int[] dictIds, int length, double[] outValues) { - for (int i = 0; i < length; i++) { - outValues[i] = getDoubleValue(dictIds[i]); - } - } - - @Override - public void readStringValues(int[] dictIds, int length, String[] outValues) { - for (int i = 0; i < length; i++) { - outValues[i] = getStringValue(dictIds[i]); - } - } - - @Override - public void readBytesValues(int[] dictIds, int length, byte[][] outValues) { - for (int i = 0; i < length; i++) { - outValues[i] = getBytesValue(dictIds[i]); - } - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BaseImmutableDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BaseImmutableDictionary.java index 043151a..b9f5f53 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BaseImmutableDictionary.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BaseImmutableDictionary.java @@ -19,17 +19,22 @@ package org.apache.pinot.core.segment.index.readers; import com.google.common.base.Preconditions; +import it.unimi.dsi.fastutil.ints.IntSet; import java.io.IOException; import java.util.Arrays; import org.apache.pinot.common.utils.StringUtil; -import org.apache.pinot.spi.utils.ByteArray; import org.apache.pinot.core.io.util.FixedByteValueReaderWriter; import org.apache.pinot.core.io.util.ValueReader; import org.apache.pinot.core.io.util.VarLengthBytesValueReaderWriter; import org.apache.pinot.core.segment.memory.PinotDataBuffer; +import org.apache.pinot.spi.utils.ByteArray; -public abstract class BaseImmutableDictionary extends BaseDictionary { +/** + * Base implementation of immutable dictionary. + */ +@SuppressWarnings("rawtypes") +public abstract class BaseImmutableDictionary implements Dictionary { private final ValueReader _valueReader; private final int _length; private final int _numBytesPerValue; @@ -59,12 +64,6 @@ public abstract class BaseImmutableDictionary extends BaseDictionary { _paddingByte = 0; } - /** - * Returns the insertion index of string representation of the value in the dictionary. Follows the same behavior as - * in {@link Arrays#binarySearch(Object[], Object)}. This API is for range predicate evaluation. - */ - public abstract int insertionIndexOf(String stringValue); - @Override public boolean isSorted() { return true; @@ -82,6 +81,34 @@ public abstract class BaseImmutableDictionary extends BaseDictionary { } @Override + public IntSet getDictIdsInRange(String lower, String upper, boolean includeLower, boolean includeUpper) { + // This method should not be called for sorted dictionary. + throw new UnsupportedOperationException(); + } + + @Override + public int compare(int dictId1, int dictId2) { + return Integer.compare(dictId1, dictId2); + } + + @Override + public Comparable getMinVal() { + return (Comparable) get(0); + } + + @Override + public Comparable getMaxVal() { + return (Comparable) get(_length - 1); + } + + @Override + public Object getSortedValues() { + // This method is for the stats collection phase when sealing the consuming segment, so it is not required for + // regular immutable dictionary within the immutable segment. + throw new UnsupportedOperationException(); + } + + @Override public void close() throws IOException { if (_valueReader != null) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BytesDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BytesDictionary.java index 8d144ca..cfcb654 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BytesDictionary.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BytesDictionary.java @@ -20,6 +20,7 @@ package org.apache.pinot.core.segment.index.readers; import org.apache.pinot.core.segment.memory.PinotDataBuffer; import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.utils.ByteArray; import org.apache.pinot.spi.utils.BytesUtils; @@ -33,13 +34,23 @@ public class BytesDictionary extends BaseImmutableDictionary { } @Override + public DataType getValueType() { + return DataType.BYTES; + } + + @Override public int insertionIndexOf(String stringValue) { return binarySearch(BytesUtils.toBytes(stringValue)); } @Override - public DataType getValueType() { - return DataType.BYTES; + public ByteArray getMinVal() { + return new ByteArray(getBytes(0)); + } + + @Override + public ByteArray getMaxVal() { + return new ByteArray(getBytes(length() - 1)); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ConstantValueBytesDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ConstantValueBytesDictionary.java index 8efe097..6dbba30 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ConstantValueBytesDictionary.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ConstantValueBytesDictionary.java @@ -35,6 +35,11 @@ public class ConstantValueBytesDictionary extends BaseImmutableDictionary { } @Override + public DataType getValueType() { + return DataType.BYTES; + } + + @Override public int insertionIndexOf(String stringValue) { int result = ByteArray.compare(BytesUtils.toBytes(stringValue), _value); if (result < 0) { @@ -47,8 +52,18 @@ public class ConstantValueBytesDictionary extends BaseImmutableDictionary { } @Override - public DataType getValueType() { - return DataType.BYTES; + public ByteArray getMinVal() { + return new ByteArray(_value); + } + + @Override + public ByteArray getMaxVal() { + return new ByteArray(_value); + } + + @Override + public Object getSortedValues() { + return new ByteArray[]{new ByteArray(_value)}; } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ConstantValueDoubleDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ConstantValueDoubleDictionary.java index 24cb4a0..2761e13 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ConstantValueDoubleDictionary.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ConstantValueDoubleDictionary.java @@ -33,6 +33,11 @@ public class ConstantValueDoubleDictionary extends BaseImmutableDictionary { } @Override + public DataType getValueType() { + return DataType.DOUBLE; + } + + @Override public int insertionIndexOf(String stringValue) { double doubleValue = Double.parseDouble(stringValue); if (doubleValue < _value) { @@ -45,8 +50,18 @@ public class ConstantValueDoubleDictionary extends BaseImmutableDictionary { } @Override - public DataType getValueType() { - return DataType.DOUBLE; + public Double getMinVal() { + return _value; + } + + @Override + public Double getMaxVal() { + return _value; + } + + @Override + public double[] getSortedValues() { + return new double[]{_value}; } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ConstantValueFloatDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ConstantValueFloatDictionary.java index cea2f46..2a08cee 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ConstantValueFloatDictionary.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ConstantValueFloatDictionary.java @@ -33,6 +33,11 @@ public class ConstantValueFloatDictionary extends BaseImmutableDictionary { } @Override + public DataType getValueType() { + return DataType.FLOAT; + } + + @Override public int insertionIndexOf(String stringValue) { float floatValue = Float.parseFloat(stringValue); if (floatValue < _value) { @@ -45,8 +50,18 @@ public class ConstantValueFloatDictionary extends BaseImmutableDictionary { } @Override - public DataType getValueType() { - return DataType.FLOAT; + public Float getMinVal() { + return _value; + } + + @Override + public Float getMaxVal() { + return _value; + } + + @Override + public float[] getSortedValues() { + return new float[]{_value}; } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ConstantValueIntDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ConstantValueIntDictionary.java index 81830de..e6db8a5 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ConstantValueIntDictionary.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ConstantValueIntDictionary.java @@ -33,6 +33,11 @@ public class ConstantValueIntDictionary extends BaseImmutableDictionary { } @Override + public DataType getValueType() { + return DataType.INT; + } + + @Override public int insertionIndexOf(String stringValue) { int intValue = Integer.parseInt(stringValue); if (intValue < _value) { @@ -45,8 +50,18 @@ public class ConstantValueIntDictionary extends BaseImmutableDictionary { } @Override - public DataType getValueType() { - return DataType.INT; + public Integer getMinVal() { + return _value; + } + + @Override + public Integer getMaxVal() { + return _value; + } + + @Override + public int[] getSortedValues() { + return new int[]{_value}; } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ConstantValueLongDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ConstantValueLongDictionary.java index d988561..1c49f50 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ConstantValueLongDictionary.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ConstantValueLongDictionary.java @@ -33,6 +33,11 @@ public class ConstantValueLongDictionary extends BaseImmutableDictionary { } @Override + public DataType getValueType() { + return DataType.LONG; + } + + @Override public int insertionIndexOf(String stringValue) { long longValue = Long.parseLong(stringValue); if (longValue < _value) { @@ -45,8 +50,18 @@ public class ConstantValueLongDictionary extends BaseImmutableDictionary { } @Override - public DataType getValueType() { - return DataType.LONG; + public Long getMinVal() { + return _value; + } + + @Override + public Long getMaxVal() { + return _value; + } + + @Override + public long[] getSortedValues() { + return new long[]{_value}; } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ConstantValueStringDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ConstantValueStringDictionary.java index 242cf71..ae57479 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ConstantValueStringDictionary.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ConstantValueStringDictionary.java @@ -34,6 +34,11 @@ public class ConstantValueStringDictionary extends BaseImmutableDictionary { } @Override + public DataType getValueType() { + return DataType.STRING; + } + + @Override public int insertionIndexOf(String stringValue) { int result = stringValue.compareTo(_value); if (result < 0) { @@ -46,8 +51,18 @@ public class ConstantValueStringDictionary extends BaseImmutableDictionary { } @Override - public DataType getValueType() { - return DataType.STRING; + public String getMinVal() { + return _value; + } + + @Override + public String getMaxVal() { + return _value; + } + + @Override + public String[] getSortedValues() { + return new String[]{_value}; } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/Dictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/Dictionary.java index b3b9a5c..846f379 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/Dictionary.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/Dictionary.java @@ -18,8 +18,9 @@ */ package org.apache.pinot.core.segment.index.readers; +import it.unimi.dsi.fastutil.ints.IntSet; import java.io.Closeable; -import org.apache.pinot.spi.data.FieldSpec; +import java.util.Arrays; import org.apache.pinot.spi.data.FieldSpec.DataType; @@ -27,11 +28,12 @@ import org.apache.pinot.spi.data.FieldSpec.DataType; * Interface for the dictionary. For the read APIs, type conversion among INT, LONG, FLOAT, DOUBLE, STRING should be * supported. Type conversion between STRING and BYTES via Hex encoding/decoding should be supported. */ +@SuppressWarnings("rawtypes") public interface Dictionary extends Closeable { int NULL_VALUE_INDEX = -1; /** - * NOTE: Immutable dictionary is always sorted; mutable dictionary is always unsorted. + * Returns {@code true} if the values in the dictionary are sorted, {@code false} otherwise. */ boolean isSorted(); @@ -40,14 +42,55 @@ public interface Dictionary extends Closeable { */ DataType getValueType(); + /** + * Returns the number of values in the dictionary. + */ int length(); /** * Returns the index of the string representation of the value in the dictionary, or {@link #NULL_VALUE_INDEX} (-1) if - * the value does not exist. This API is for cross-type predicate evaluation. + * the value does not exist. This method is for the cross-type predicate evaluation. */ int indexOf(String stringValue); + /** + * Returns the insertion index of the string representation of the value in the dictionary. This method follows the + * same behavior as in {@link Arrays#binarySearch(Object[], Object)}. All sorted dictionaries should support this + * method. This method is for the range predicate evaluation. + */ + int insertionIndexOf(String stringValue); + + /** + * Returns a set of dictIds in the given value range, where lower/upper bound can be "*" which indicates unbounded + * range. All unsorted dictionaries should support this method. This method is for the range predicate evaluation. + */ + IntSet getDictIdsInRange(String lower, String upper, boolean includeLower, boolean includeUpper); + + /** + * Returns the comparison result of the values (actual value instead of string representation of the value) for the + * given dictionary ids, i.e. {@code value1.compareTo(value2)}. + */ + int compare(int dictId1, int dictId2); + + /** + * Returns the minimum value in the dictionary. For type BYTES, {@code ByteArray} will be returned. Undefined if the + * dictionary is empty. + */ + Comparable getMinVal(); + + /** + * Returns the maximum value in the dictionary. For type BYTES, {@code ByteArray} will be returned. Undefined if the + * dictionary is empty. + */ + Comparable getMaxVal(); + + /** + * Returns an sorted array of all values in the dictionary. For type INT/LONG/FLOAT/DOUBLE, primitive type array will + * be returned; for type STRING, {@code String[]} will be returned; for type BYTES, {@code ByteArray[]} will be + * returned. This method is for the stats collection phase when sealing the consuming segment. + */ + Object getSortedValues(); + // Single-value read APIs /** @@ -74,19 +117,48 @@ public interface Dictionary extends Closeable { String getStringValue(int dictId); - byte[] getBytesValue(int dictId); + /** + * NOTE: Should be overridden for STRING and BYTES dictionary. + */ + default byte[] getBytesValue(int dictId) { + throw new UnsupportedOperationException(); + } // Batch read APIs - void readIntValues(int[] dictIds, int length, int[] outValues); - - void readLongValues(int[] dictIds, int length, long[] outValues); - - void readFloatValues(int[] dictIds, int length, float[] outValues); - - void readDoubleValues(int[] dictIds, int length, double[] outValues); - - void readStringValues(int[] dictIds, int length, String[] outValues); - - void readBytesValues(int[] dictIds, int length, byte[][] outValues); + default void readIntValues(int[] dictIds, int length, int[] outValues) { + for (int i = 0; i < length; i++) { + outValues[i] = getIntValue(dictIds[i]); + } + } + + default void readLongValues(int[] dictIds, int length, long[] outValues) { + for (int i = 0; i < length; i++) { + outValues[i] = getLongValue(dictIds[i]); + } + } + + default void readFloatValues(int[] dictIds, int length, float[] outValues) { + for (int i = 0; i < length; i++) { + outValues[i] = getFloatValue(dictIds[i]); + } + } + + default void readDoubleValues(int[] dictIds, int length, double[] outValues) { + for (int i = 0; i < length; i++) { + outValues[i] = getDoubleValue(dictIds[i]); + } + } + + default void readStringValues(int[] dictIds, int length, String[] outValues) { + for (int i = 0; i < length; i++) { + outValues[i] = getStringValue(dictIds[i]); + } + } + + default void readBytesValues(int[] dictIds, int length, byte[][] outValues) { + for (int i = 0; i < length; i++) { + outValues[i] = getBytesValue(dictIds[i]); + } + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/MutableDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/MutableDictionary.java new file mode 100644 index 0000000..7dabf7d --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/MutableDictionary.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.index.readers; + +/** + * Interface for mutable dictionary (for CONSUMING segment). + */ +public interface MutableDictionary extends Dictionary { + + /** + * Indexes a single-value entry (a value of the dictionary type) into the dictionary, and returns the dictId of the + * value. + */ + int index(Object value); + + /** + * Indexes a multi-value entry (an array of values of the dictionary type) into the dictionary, and returns an array + * of dictIds for each value. + */ + int[] index(Object[] values); + + @Override + default boolean isSorted() { + return false; + } + + @Override + default int insertionIndexOf(String stringValue) { + // This method should not be called for unsorted dictionary. + throw new UnsupportedOperationException(); + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/filter/predicate/RangeOfflineDictionaryPredicateEvaluatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/filter/predicate/RangeOfflineDictionaryPredicateEvaluatorTest.java index 07efd9c..b068d6d 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/operator/filter/predicate/RangeOfflineDictionaryPredicateEvaluatorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/filter/predicate/RangeOfflineDictionaryPredicateEvaluatorTest.java @@ -20,7 +20,7 @@ package org.apache.pinot.core.operator.filter.predicate; import org.apache.pinot.core.query.request.context.ExpressionContext; import org.apache.pinot.core.query.request.context.predicate.RangePredicate; -import org.apache.pinot.core.segment.index.readers.BaseImmutableDictionary; +import org.apache.pinot.core.segment.index.readers.Dictionary; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.testng.Assert; import org.testng.annotations.Test; @@ -40,10 +40,10 @@ public class RangeOfflineDictionaryPredicateEvaluatorTest { // [2, 5] rangeStart = 2; rangeEnd = 5; - BaseImmutableDictionary reader = createReader(rangeStart, rangeEnd); + Dictionary dictionary = createDictionary(rangeStart, rangeEnd); RangePredicate predicate = createPredicate(rangeStart, true, rangeEnd, true); PredicateEvaluator evaluator = - RangePredicateEvaluatorFactory.newDictionaryBasedEvaluator(predicate, reader, DataType.INT); + RangePredicateEvaluatorFactory.newDictionaryBasedEvaluator(predicate, dictionary, DataType.INT); Assert.assertFalse(evaluator.isAlwaysFalse()); Assert.assertFalse(evaluator.isAlwaysTrue()); Assert.assertTrue(evaluator.applySV(rangeStart)); @@ -62,10 +62,10 @@ public class RangeOfflineDictionaryPredicateEvaluatorTest { // (2, 5] rangeStart = 2; rangeEnd = 5; - BaseImmutableDictionary reader = createReader(rangeStart, rangeEnd); + Dictionary dictionary = createDictionary(rangeStart, rangeEnd); RangePredicate predicate = createPredicate(rangeStart, false, rangeEnd, true); PredicateEvaluator evaluator = - RangePredicateEvaluatorFactory.newDictionaryBasedEvaluator(predicate, reader, DataType.INT); + RangePredicateEvaluatorFactory.newDictionaryBasedEvaluator(predicate, dictionary, DataType.INT); Assert.assertFalse(evaluator.isAlwaysFalse()); Assert.assertFalse(evaluator.isAlwaysTrue()); Assert.assertFalse(evaluator.applySV(rangeStart)); @@ -84,10 +84,10 @@ public class RangeOfflineDictionaryPredicateEvaluatorTest { // [2, 5) rangeStart = 2; rangeEnd = 5; - BaseImmutableDictionary reader = createReader(rangeStart, rangeEnd); + Dictionary dictionary = createDictionary(rangeStart, rangeEnd); RangePredicate predicate = createPredicate(rangeStart, true, rangeEnd, false); PredicateEvaluator evaluator = - RangePredicateEvaluatorFactory.newDictionaryBasedEvaluator(predicate, reader, DataType.INT); + RangePredicateEvaluatorFactory.newDictionaryBasedEvaluator(predicate, dictionary, DataType.INT); Assert.assertFalse(evaluator.isAlwaysFalse()); Assert.assertFalse(evaluator.isAlwaysTrue()); Assert.assertTrue(evaluator.applySV(rangeStart)); @@ -106,10 +106,10 @@ public class RangeOfflineDictionaryPredicateEvaluatorTest { // (2, 5) rangeStart = 2; rangeEnd = 5; - BaseImmutableDictionary reader = createReader(rangeStart, rangeEnd); + Dictionary dictionary = createDictionary(rangeStart, rangeEnd); RangePredicate predicate = createPredicate(rangeStart, false, rangeEnd, false); PredicateEvaluator evaluator = - RangePredicateEvaluatorFactory.newDictionaryBasedEvaluator(predicate, reader, DataType.INT); + RangePredicateEvaluatorFactory.newDictionaryBasedEvaluator(predicate, dictionary, DataType.INT); Assert.assertFalse(evaluator.isAlwaysFalse()); Assert.assertFalse(evaluator.isAlwaysTrue()); Assert.assertFalse(evaluator.applySV(rangeStart)); @@ -140,10 +140,10 @@ public class RangeOfflineDictionaryPredicateEvaluatorTest { // [0, 5) rangeStart = 0; rangeEnd = 5; - BaseImmutableDictionary reader = createReader(rangeStart, rangeEnd); + Dictionary dictionary = createDictionary(rangeStart, rangeEnd); RangePredicate predicate = createPredicate(rangeStart, true, rangeEnd, false); PredicateEvaluator evaluator = - RangePredicateEvaluatorFactory.newDictionaryBasedEvaluator(predicate, reader, DataType.INT); + RangePredicateEvaluatorFactory.newDictionaryBasedEvaluator(predicate, dictionary, DataType.INT); Assert.assertFalse(evaluator.isAlwaysFalse()); Assert.assertFalse(evaluator.isAlwaysTrue()); Assert.assertTrue(evaluator.applySV(rangeStart)); @@ -161,10 +161,10 @@ public class RangeOfflineDictionaryPredicateEvaluatorTest { // [0, 5] rangeStart = 0; rangeEnd = 5; - BaseImmutableDictionary reader = createReader(rangeStart, rangeEnd); + Dictionary dictionary = createDictionary(rangeStart, rangeEnd); RangePredicate predicate = createPredicate(rangeStart, true, rangeEnd, true); PredicateEvaluator evaluator = - RangePredicateEvaluatorFactory.newDictionaryBasedEvaluator(predicate, reader, DataType.INT); + RangePredicateEvaluatorFactory.newDictionaryBasedEvaluator(predicate, dictionary, DataType.INT); Assert.assertFalse(evaluator.isAlwaysFalse()); Assert.assertFalse(evaluator.isAlwaysTrue()); Assert.assertTrue(evaluator.applySV(rangeStart)); @@ -176,10 +176,10 @@ public class RangeOfflineDictionaryPredicateEvaluatorTest { // [6, DICT_LEN-1] rangeStart = 6; rangeEnd = DICT_LEN - 1; - BaseImmutableDictionary reader = createReader(rangeStart, rangeEnd); + Dictionary dictionary = createDictionary(rangeStart, rangeEnd); RangePredicate predicate = createPredicate(rangeStart, true, rangeEnd, true); PredicateEvaluator evaluator = - RangePredicateEvaluatorFactory.newDictionaryBasedEvaluator(predicate, reader, DataType.INT); + RangePredicateEvaluatorFactory.newDictionaryBasedEvaluator(predicate, dictionary, DataType.INT); Assert.assertFalse(evaluator.isAlwaysFalse()); Assert.assertFalse(evaluator.isAlwaysTrue()); Assert.assertTrue(evaluator.applySV(rangeStart)); @@ -197,10 +197,10 @@ public class RangeOfflineDictionaryPredicateEvaluatorTest { // (6, DICT_LEN-1] rangeStart = 6; rangeEnd = DICT_LEN - 1; - BaseImmutableDictionary reader = createReader(rangeStart, rangeEnd); + Dictionary dictionary = createDictionary(rangeStart, rangeEnd); RangePredicate predicate = createPredicate(rangeStart, false, rangeEnd, true); PredicateEvaluator evaluator = - RangePredicateEvaluatorFactory.newDictionaryBasedEvaluator(predicate, reader, DataType.INT); + RangePredicateEvaluatorFactory.newDictionaryBasedEvaluator(predicate, dictionary, DataType.INT); Assert.assertFalse(evaluator.isAlwaysFalse()); Assert.assertFalse(evaluator.isAlwaysTrue()); Assert.assertFalse(evaluator.applySV(rangeStart)); @@ -212,10 +212,10 @@ public class RangeOfflineDictionaryPredicateEvaluatorTest { // [0, DICT_LEN-1] rangeStart = 0; rangeEnd = DICT_LEN - 1; - BaseImmutableDictionary reader = createReader(rangeStart, rangeEnd); + Dictionary dictionary = createDictionary(rangeStart, rangeEnd); RangePredicate predicate = createPredicate(rangeStart, true, rangeEnd, true); PredicateEvaluator evaluator = - RangePredicateEvaluatorFactory.newDictionaryBasedEvaluator(predicate, reader, DataType.INT); + RangePredicateEvaluatorFactory.newDictionaryBasedEvaluator(predicate, dictionary, DataType.INT); Assert.assertFalse(evaluator.isAlwaysFalse()); Assert.assertTrue(evaluator.isAlwaysTrue()); Assert.assertTrue(evaluator.applySV(rangeStart)); @@ -232,7 +232,7 @@ public class RangeOfflineDictionaryPredicateEvaluatorTest { // (4, 5) rangeStart = 4; rangeEnd = 5; - BaseImmutableDictionary reader = createReader(rangeStart, rangeEnd); + Dictionary reader = createDictionary(rangeStart, rangeEnd); RangePredicate predicate = createPredicate(rangeStart, false, rangeEnd, false); PredicateEvaluator evaluator = RangePredicateEvaluatorFactory.newDictionaryBasedEvaluator(predicate, reader, DataType.INT); @@ -251,12 +251,13 @@ public class RangeOfflineDictionaryPredicateEvaluatorTest { } } - private BaseImmutableDictionary createReader(int rangeStart, int rangeEnd) { - BaseImmutableDictionary reader = mock(BaseImmutableDictionary.class); - when(reader.insertionIndexOf("lower")).thenReturn(rangeStart); - when(reader.insertionIndexOf("upper")).thenReturn(rangeEnd); - when(reader.length()).thenReturn(DICT_LEN); - return reader; + private Dictionary createDictionary(int rangeStart, int rangeEnd) { + Dictionary dictionary = mock(Dictionary.class); + when(dictionary.isSorted()).thenReturn(true); + when(dictionary.length()).thenReturn(DICT_LEN); + when(dictionary.insertionIndexOf("lower")).thenReturn(rangeStart); + when(dictionary.insertionIndexOf("upper")).thenReturn(rangeEnd); + return dictionary; } private RangePredicate createPredicate(int lower, boolean inclLower, int upper, boolean inclUpper) { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/dictionary/MutableDictionaryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/dictionary/MutableDictionaryTest.java index 82d8a57..f5061ac 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/dictionary/MutableDictionaryTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/dictionary/MutableDictionaryTest.java @@ -31,10 +31,11 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.stream.Collectors; import org.apache.commons.lang.RandomStringUtils; -import org.apache.pinot.spi.data.FieldSpec; -import org.apache.pinot.spi.utils.ByteArray; import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager; import org.apache.pinot.core.io.writer.impl.DirectMemoryManager; +import org.apache.pinot.core.segment.index.readers.MutableDictionary; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.utils.ByteArray; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.Test; @@ -60,14 +61,14 @@ public class MutableDictionaryTest { @Test public void testSingleReaderSingleWriter() { try { - try (BaseMutableDictionary dictionary = new IntOnHeapMutableDictionary()) { + try (MutableDictionary dictionary = new IntOnHeapMutableDictionary()) { testSingleReaderSingleWriter(dictionary, FieldSpec.DataType.INT); } - try (BaseMutableDictionary dictionary = new IntOffHeapMutableDictionary(EST_CARDINALITY, 2000, _memoryManager, + try (MutableDictionary dictionary = new IntOffHeapMutableDictionary(EST_CARDINALITY, 2000, _memoryManager, "intColumn")) { testSingleReaderSingleWriter(dictionary, FieldSpec.DataType.INT); } - try (BaseMutableDictionary dictionary = new StringOffHeapMutableDictionary(EST_CARDINALITY, 2000, _memoryManager, + try (MutableDictionary dictionary = new StringOffHeapMutableDictionary(EST_CARDINALITY, 2000, _memoryManager, "stringColumn", 32)) { testSingleReaderSingleWriter(dictionary, FieldSpec.DataType.STRING); } @@ -76,7 +77,7 @@ public class MutableDictionaryTest { } } - private void testSingleReaderSingleWriter(BaseMutableDictionary dictionary, FieldSpec.DataType dataType) + private void testSingleReaderSingleWriter(MutableDictionary dictionary, FieldSpec.DataType dataType) throws Exception { Future<Void> readerFuture = _executorService.submit(new Reader(dictionary, dataType)); Future<Void> writerFuture = _executorService.submit(new Writer(dictionary, dataType)); @@ -88,14 +89,14 @@ public class MutableDictionaryTest { @Test public void testMultiReadersSingleWriter() { try { - try (BaseMutableDictionary dictionary = new IntOnHeapMutableDictionary()) { + try (MutableDictionary dictionary = new IntOnHeapMutableDictionary()) { testMultiReadersSingleWriter(dictionary, FieldSpec.DataType.INT); } - try (BaseMutableDictionary dictionary = new IntOffHeapMutableDictionary(EST_CARDINALITY, 2000, _memoryManager, + try (MutableDictionary dictionary = new IntOffHeapMutableDictionary(EST_CARDINALITY, 2000, _memoryManager, "intColumn")) { testMultiReadersSingleWriter(dictionary, FieldSpec.DataType.INT); } - try (BaseMutableDictionary dictionary = new StringOffHeapMutableDictionary(EST_CARDINALITY, 2000, _memoryManager, + try (MutableDictionary dictionary = new StringOffHeapMutableDictionary(EST_CARDINALITY, 2000, _memoryManager, "stringColumn", 32)) { testMultiReadersSingleWriter(dictionary, FieldSpec.DataType.STRING); } @@ -104,7 +105,7 @@ public class MutableDictionaryTest { } } - private void testMultiReadersSingleWriter(BaseMutableDictionary dictionary, FieldSpec.DataType dataType) + private void testMultiReadersSingleWriter(MutableDictionary dictionary, FieldSpec.DataType dataType) throws Exception { Future[] readerFutures = new Future[NUM_READERS]; for (int i = 0; i < NUM_READERS; i++) { @@ -122,7 +123,7 @@ public class MutableDictionaryTest { public void testOnHeapMutableDictionary() { try { for (FieldSpec.DataType dataType : DATA_TYPES) { - try (BaseMutableDictionary dictionary = MutableDictionaryFactory + try (MutableDictionary dictionary = MutableDictionaryFactory .getMutableDictionary(dataType, false, null, 0, 0, null)) { testMutableDictionary(dictionary, dataType); } @@ -139,7 +140,7 @@ public class MutableDictionaryTest { try { for (FieldSpec.DataType dataType : DATA_TYPES) { for (int maxOverflowSize : maxOverflowSizes) { - try (BaseMutableDictionary dictionary = makeOffHeapDictionary(EST_CARDINALITY, maxOverflowSize, dataType)) { + try (MutableDictionary dictionary = makeOffHeapDictionary(EST_CARDINALITY, maxOverflowSize, dataType)) { testMutableDictionary(dictionary, dataType); } } @@ -149,7 +150,7 @@ public class MutableDictionaryTest { } } - private void testMutableDictionary(BaseMutableDictionary dictionary, FieldSpec.DataType dataType) { + private void testMutableDictionary(MutableDictionary dictionary, FieldSpec.DataType dataType) { Map<Object, Integer> valueToDictId = new HashMap<>(); int numEntries = 0; @@ -200,7 +201,7 @@ public class MutableDictionaryTest { dictionary.length()); } - private BaseMutableDictionary makeOffHeapDictionary(int estCardinality, int maxOverflowSize, + private MutableDictionary makeOffHeapDictionary(int estCardinality, int maxOverflowSize, FieldSpec.DataType dataType) { switch (dataType) { case INT: @@ -253,10 +254,10 @@ public class MutableDictionaryTest { * <p>We can assume that we always first get the index of a value, then use the index to fetch the value. */ private class Reader implements Callable<Void> { - private final BaseMutableDictionary _dictionary; + private final MutableDictionary _dictionary; private final FieldSpec.DataType _dataType; - private Reader(BaseMutableDictionary dictionary, FieldSpec.DataType dataType) { + private Reader(MutableDictionary dictionary, FieldSpec.DataType dataType) { _dictionary = dictionary; _dataType = dataType; } @@ -282,10 +283,10 @@ public class MutableDictionaryTest { * Writer to index value into dictionary, then check the index of the value. */ private class Writer implements Callable<Void> { - private final BaseMutableDictionary _dictionary; + private final MutableDictionary _dictionary; private final FieldSpec.DataType _dataType; - private Writer(BaseMutableDictionary dictionary, FieldSpec.DataType dataType) { + private Writer(MutableDictionary dictionary, FieldSpec.DataType dataType) { _dictionary = dictionary; _dataType = dataType; } @@ -321,7 +322,7 @@ public class MutableDictionaryTest { /** * Helper method to check whether the value of the given dictId is one larger than the dictId. */ - private static void checkEquals(BaseMutableDictionary dictionary, int dictId, FieldSpec.DataType dataType) { + private static void checkEquals(MutableDictionary dictionary, int dictId, FieldSpec.DataType dataType) { switch (dataType) { case INT: Assert.assertEquals(dictionary.getIntValue(dictId), dictId + 1); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org