This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new ffbc5b0e52 Ensure min/max value generation in the segment metadata. (#10891) ffbc5b0e52 is described below commit ffbc5b0e52e2c8ba507f088ec1fd39996f22e8c1 Author: Abhishek Sharma <abhishek.sha...@spothero.com> AuthorDate: Mon Jul 3 19:19:07 2023 -0400 Ensure min/max value generation in the segment metadata. (#10891) --- .../ColumnMinMaxValueGenerator.java | 287 +++++++++++++++++---- .../index/loader/SegmentPreProcessorTest.java | 9 +- 2 files changed, 245 insertions(+), 51 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java index 2298d72cd9..5cfa637ee2 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.List; import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.commons.lang3.StringUtils; import org.apache.pinot.segment.local.segment.creator.impl.SegmentColumnarIndexCreator; import org.apache.pinot.segment.local.segment.index.readers.BytesDictionary; import org.apache.pinot.segment.local.segment.index.readers.DoubleDictionary; @@ -29,6 +30,11 @@ import org.apache.pinot.segment.local.segment.index.readers.FloatDictionary; import org.apache.pinot.segment.local.segment.index.readers.IntDictionary; import org.apache.pinot.segment.local.segment.index.readers.LongDictionary; import org.apache.pinot.segment.local.segment.index.readers.StringDictionary; +import org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext; +import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader; +import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader; +import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader; +import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.segment.spi.index.StandardIndexes; @@ -37,6 +43,7 @@ import org.apache.pinot.segment.spi.store.SegmentDirectory; import org.apache.pinot.segment.spi.utils.SegmentMetadataUtils; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.ByteArray; import static org.apache.pinot.spi.data.FieldSpec.DataType; @@ -119,65 +126,257 @@ public class ColumnMinMaxValueGenerator { private boolean needAddColumnMinMaxValueForColumn(String columnName) { ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(columnName); - return columnMetadata.hasDictionary() && columnMetadata.getMinValue() == null - && columnMetadata.getMaxValue() == null && !columnMetadata.isMinMaxValueInvalid(); + return columnMetadata.getMinValue() == null && columnMetadata.getMaxValue() == null + && !columnMetadata.isMinMaxValueInvalid(); } private void addColumnMinMaxValueForColumn(String columnName) throws Exception { - // Skip column without dictionary or with min/max value already set + // Skip column with min/max value already set ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(columnName); - if (!columnMetadata.hasDictionary() || columnMetadata.getMinValue() != null - || columnMetadata.getMaxValue() != null) { + if (columnMetadata.getMinValue() != null || columnMetadata.getMaxValue() != null) { return; } - PinotDataBuffer dictionaryBuffer = _segmentWriter.getIndexFor(columnName, StandardIndexes.dictionary()); DataType dataType = columnMetadata.getDataType().getStoredType(); - int length = columnMetadata.getCardinality(); - switch (dataType) { - case INT: - try (IntDictionary intDictionary = new IntDictionary(dictionaryBuffer, length)) { - SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName, - intDictionary.getStringValue(0), intDictionary.getStringValue(length - 1)); - } - break; - case LONG: - try (LongDictionary longDictionary = new LongDictionary(dictionaryBuffer, length)) { + if (columnMetadata.hasDictionary()) { + PinotDataBuffer dictionaryBuffer = _segmentWriter.getIndexFor(columnName, StandardIndexes.dictionary()); + int length = columnMetadata.getCardinality(); + switch (dataType) { + case INT: + try (IntDictionary intDictionary = new IntDictionary(dictionaryBuffer, length)) { + SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName, + intDictionary.getStringValue(0), intDictionary.getStringValue(length - 1)); + } + break; + case LONG: + try (LongDictionary longDictionary = new LongDictionary(dictionaryBuffer, length)) { + SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName, + longDictionary.getStringValue(0), longDictionary.getStringValue(length - 1)); + } + break; + case FLOAT: + try (FloatDictionary floatDictionary = new FloatDictionary(dictionaryBuffer, length)) { + SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName, + floatDictionary.getStringValue(0), floatDictionary.getStringValue(length - 1)); + } + break; + case DOUBLE: + try (DoubleDictionary doubleDictionary = new DoubleDictionary(dictionaryBuffer, length)) { + SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName, + doubleDictionary.getStringValue(0), doubleDictionary.getStringValue(length - 1)); + } + break; + case STRING: + try (StringDictionary stringDictionary = new StringDictionary(dictionaryBuffer, length, + columnMetadata.getColumnMaxLength())) { + SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName, + stringDictionary.getStringValue(0), stringDictionary.getStringValue(length - 1)); + } + break; + case BYTES: + try (BytesDictionary bytesDictionary = new BytesDictionary(dictionaryBuffer, length, + columnMetadata.getColumnMaxLength())) { + SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName, + bytesDictionary.getStringValue(0), bytesDictionary.getStringValue(length - 1)); + } + break; + default: + throw new IllegalStateException("Unsupported data type: " + dataType + " for column: " + columnName); + } + } else { + // setting min/max for non-dictionary columns. + int numDocs = columnMetadata.getTotalDocs(); + boolean isSingleValueField = _segmentMetadata.getSchema().getFieldSpecFor(columnName).isSingleValueField(); + PinotDataBuffer forwardBuffer = _segmentWriter.getIndexFor(columnName, StandardIndexes.forward()); + switch (dataType) { + case INT: { + int min = Integer.MAX_VALUE; + int max = Integer.MIN_VALUE; + if (isSingleValueField) { + try (FixedByteChunkSVForwardIndexReader rawIndexReader = new FixedByteChunkSVForwardIndexReader( + forwardBuffer, DataType.INT); ChunkReaderContext readerContext = rawIndexReader.createContext()) { + for (int docId = 0; docId < numDocs; docId++) { + int value = rawIndexReader.getInt(docId, readerContext); + min = Math.min(min, value); + max = Math.max(max, value); + } + } + } else { + try (FixedByteChunkMVForwardIndexReader rawIndexReader = new FixedByteChunkMVForwardIndexReader( + forwardBuffer, DataType.INT); ChunkReaderContext readerContext = rawIndexReader.createContext()) { + for (int docId = 0; docId < numDocs; docId++) { + int[] value = rawIndexReader.getIntMV(docId, readerContext); + for (int i = 0; i < value.length; i++) { + min = Math.min(min, value[i]); + max = Math.max(max, value[i]); + } + } + } + } SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName, - longDictionary.getStringValue(0), longDictionary.getStringValue(length - 1)); - } - break; - case FLOAT: - try (FloatDictionary floatDictionary = new FloatDictionary(dictionaryBuffer, length)) { + String.valueOf(min), String.valueOf(max)); + } + break; + case LONG: { + long min = Long.MAX_VALUE; + long max = Long.MIN_VALUE; + if (isSingleValueField) { + try (FixedByteChunkSVForwardIndexReader rawIndexReader = new FixedByteChunkSVForwardIndexReader( + forwardBuffer, DataType.LONG); ChunkReaderContext readerContext = rawIndexReader.createContext()) { + for (int docId = 0; docId < numDocs; docId++) { + long value = rawIndexReader.getLong(docId, readerContext); + min = Math.min(min, value); + max = Math.max(max, value); + } + } + } else { + try (FixedByteChunkMVForwardIndexReader rawIndexReader = new FixedByteChunkMVForwardIndexReader( + forwardBuffer, DataType.LONG); ChunkReaderContext readerContext = rawIndexReader.createContext()) { + for (int docId = 0; docId < numDocs; docId++) { + long[] value = rawIndexReader.getLongMV(docId, readerContext); + for (int i = 0; i < value.length; i++) { + min = Math.min(min, value[i]); + max = Math.max(max, value[i]); + } + } + } + } SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName, - floatDictionary.getStringValue(0), floatDictionary.getStringValue(length - 1)); - } - break; - case DOUBLE: - try (DoubleDictionary doubleDictionary = new DoubleDictionary(dictionaryBuffer, length)) { + String.valueOf(min), String.valueOf(max)); + } + break; + case FLOAT: { + float min = Float.MAX_VALUE; + float max = Float.MIN_VALUE; + if (isSingleValueField) { + try (FixedByteChunkSVForwardIndexReader rawIndexReader = new FixedByteChunkSVForwardIndexReader( + forwardBuffer, DataType.FLOAT); ChunkReaderContext readerContext = rawIndexReader.createContext()) { + for (int docId = 0; docId < numDocs; docId++) { + float value = rawIndexReader.getFloat(docId, readerContext); + min = Math.min(min, value); + max = Math.max(max, value); + } + } + } else { + try (FixedByteChunkMVForwardIndexReader rawIndexReader = new FixedByteChunkMVForwardIndexReader( + forwardBuffer, DataType.FLOAT); ChunkReaderContext readerContext = rawIndexReader.createContext()) { + for (int docId = 0; docId < numDocs; docId++) { + float[] value = rawIndexReader.getFloatMV(docId, readerContext); + for (int i = 0; i < value.length; i++) { + min = Math.min(min, value[i]); + max = Math.max(max, value[i]); + } + } + } + } SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName, - doubleDictionary.getStringValue(0), doubleDictionary.getStringValue(length - 1)); - } - break; - case STRING: - try (StringDictionary stringDictionary = new StringDictionary(dictionaryBuffer, length, - columnMetadata.getColumnMaxLength())) { + String.valueOf(min), String.valueOf(max)); + } + break; + case DOUBLE: { + double min = Double.MAX_VALUE; + double max = Double.MIN_VALUE; + if (isSingleValueField) { + try (FixedByteChunkSVForwardIndexReader rawIndexReader = new FixedByteChunkSVForwardIndexReader( + forwardBuffer, DataType.DOUBLE); ChunkReaderContext readerContext = rawIndexReader.createContext()) { + for (int docId = 0; docId < numDocs; docId++) { + double value = rawIndexReader.getDouble(docId, readerContext); + min = Math.min(min, value); + max = Math.max(max, value); + } + } + } else { + try (FixedByteChunkMVForwardIndexReader rawIndexReader = new FixedByteChunkMVForwardIndexReader( + forwardBuffer, DataType.DOUBLE); ChunkReaderContext readerContext = rawIndexReader.createContext()) { + for (int docId = 0; docId < numDocs; docId++) { + double[] value = rawIndexReader.getDoubleMV(docId, readerContext); + for (int i = 0; i < value.length; i++) { + min = Math.min(min, value[i]); + max = Math.max(max, value[i]); + } + } + } + } SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName, - stringDictionary.getStringValue(0), stringDictionary.getStringValue(length - 1)); - } - break; - case BYTES: - try (BytesDictionary bytesDictionary = new BytesDictionary(dictionaryBuffer, length, - columnMetadata.getColumnMaxLength())) { + String.valueOf(min), String.valueOf(max)); + } + break; + case STRING: { + String min = null; + String max = null; + if (isSingleValueField) { + try (VarByteChunkSVForwardIndexReader rawIndexReader = new VarByteChunkSVForwardIndexReader(forwardBuffer, + DataType.STRING); ChunkReaderContext readerContext = rawIndexReader.createContext()) { + for (int docId = 0; docId < numDocs; docId++) { + String value = rawIndexReader.getString(docId, readerContext); + if (min == null || StringUtils.compare(min, value) > 0) { + min = value; + } + if (max == null || StringUtils.compare(max, value) < 0) { + max = value; + } + } + } + } else { + try (VarByteChunkMVForwardIndexReader rawIndexReader = new VarByteChunkMVForwardIndexReader(forwardBuffer, + DataType.STRING); ChunkReaderContext readerContext = rawIndexReader.createContext()) { + for (int docId = 0; docId < numDocs; docId++) { + String[] value = rawIndexReader.getStringMV(docId, readerContext); + for (int i = 0; i < value.length; i++) { + if (min == null || StringUtils.compare(min, value[i]) > 0) { + min = value[i]; + } + if (max == null || StringUtils.compare(max, value[i]) < 0) { + max = value[i]; + } + } + } + } + } + SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName, min, max); + } + break; + case BYTES: { + byte[] min = null; + byte[] max = null; + if (isSingleValueField) { + try (VarByteChunkSVForwardIndexReader rawIndexReader = new VarByteChunkSVForwardIndexReader(forwardBuffer, + DataType.BYTES); ChunkReaderContext readerContext = rawIndexReader.createContext()) { + for (int docId = 0; docId < numDocs; docId++) { + byte[] value = rawIndexReader.getBytes(docId, readerContext); + if (min == null || ByteArray.compare(value, min) > 0) { + min = value; + } + if (max == null || ByteArray.compare(value, max) < 0) { + max = value; + } + } + } + } else { + try (VarByteChunkMVForwardIndexReader rawIndexReader = new VarByteChunkMVForwardIndexReader(forwardBuffer, + DataType.BYTES); ChunkReaderContext readerContext = rawIndexReader.createContext()) { + for (int docId = 0; docId < numDocs; docId++) { + byte[][] value = rawIndexReader.getBytesMV(docId, readerContext); + for (int i = 0; i < value.length; i++) { + if (min == null || ByteArray.compare(value[i], min) > 0) { + min = value[i]; + } + if (max == null || ByteArray.compare(value[i], max) < 0) { + max = value[i]; + } + } + } + } + } SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName, - bytesDictionary.getStringValue(0), bytesDictionary.getStringValue(length - 1)); - } - break; - default: - throw new IllegalStateException("Unsupported data type: " + dataType + " for column: " + columnName); + String.valueOf(new ByteArray(min)), String.valueOf(new ByteArray(max))); + } + break; + default: + throw new IllegalStateException("Unsupported data type: " + dataType + " for column: " + columnName); + } } - _minMaxValueAdded = true; } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java index 16d265c04d..0bdaefca0a 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java @@ -1755,13 +1755,8 @@ public class SegmentPreProcessorTest { } segmentMetadata = new SegmentMetadataImpl(_indexDir); segmentMetadata.getColumnMetadataMap().forEach((k, v) -> { - if (v.hasDictionary()) { - assertNotNull(v.getMinValue(), "checking column: " + k); - assertNotNull(v.getMaxValue(), "checking column: " + k); - } else { - assertNull(v.getMinValue(), "checking column: " + k); - assertNull(v.getMaxValue(), "checking column: " + k); - } + assertNotNull(v.getMinValue(), "checking column: " + k); + assertNotNull(v.getMaxValue(), "checking column: " + k); }); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org