This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ffbc5b0e52 Ensure min/max value generation in the segment metadata.  
(#10891)
ffbc5b0e52 is described below

commit ffbc5b0e52e2c8ba507f088ec1fd39996f22e8c1
Author: Abhishek Sharma <abhishek.sha...@spothero.com>
AuthorDate: Mon Jul 3 19:19:07 2023 -0400

    Ensure min/max value generation in the segment metadata.  (#10891)
---
 .../ColumnMinMaxValueGenerator.java                | 287 +++++++++++++++++----
 .../index/loader/SegmentPreProcessorTest.java      |   9 +-
 2 files changed, 245 insertions(+), 51 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
index 2298d72cd9..5cfa637ee2 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.lang3.StringUtils;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentColumnarIndexCreator;
 import org.apache.pinot.segment.local.segment.index.readers.BytesDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.DoubleDictionary;
@@ -29,6 +30,11 @@ import 
org.apache.pinot.segment.local.segment.index.readers.FloatDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.IntDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.LongDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.StringDictionary;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
@@ -37,6 +43,7 @@ import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.segment.spi.utils.SegmentMetadataUtils;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.ByteArray;
 
 import static org.apache.pinot.spi.data.FieldSpec.DataType;
 
@@ -119,65 +126,257 @@ public class ColumnMinMaxValueGenerator {
 
   private boolean needAddColumnMinMaxValueForColumn(String columnName) {
     ColumnMetadata columnMetadata = 
_segmentMetadata.getColumnMetadataFor(columnName);
-    return columnMetadata.hasDictionary() && columnMetadata.getMinValue() == 
null
-        && columnMetadata.getMaxValue() == null && 
!columnMetadata.isMinMaxValueInvalid();
+    return columnMetadata.getMinValue() == null && 
columnMetadata.getMaxValue() == null
+        && !columnMetadata.isMinMaxValueInvalid();
   }
 
   private void addColumnMinMaxValueForColumn(String columnName)
       throws Exception {
-    // Skip column without dictionary or with min/max value already set
+    // Skip column with min/max value already set
     ColumnMetadata columnMetadata = 
_segmentMetadata.getColumnMetadataFor(columnName);
-    if (!columnMetadata.hasDictionary() || columnMetadata.getMinValue() != null
-        || columnMetadata.getMaxValue() != null) {
+    if (columnMetadata.getMinValue() != null || columnMetadata.getMaxValue() 
!= null) {
       return;
     }
 
-    PinotDataBuffer dictionaryBuffer = _segmentWriter.getIndexFor(columnName, 
StandardIndexes.dictionary());
     DataType dataType = columnMetadata.getDataType().getStoredType();
-    int length = columnMetadata.getCardinality();
-    switch (dataType) {
-      case INT:
-        try (IntDictionary intDictionary = new IntDictionary(dictionaryBuffer, 
length)) {
-          
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName,
-              intDictionary.getStringValue(0), 
intDictionary.getStringValue(length - 1));
-        }
-        break;
-      case LONG:
-        try (LongDictionary longDictionary = new 
LongDictionary(dictionaryBuffer, length)) {
+    if (columnMetadata.hasDictionary()) {
+      PinotDataBuffer dictionaryBuffer = 
_segmentWriter.getIndexFor(columnName, StandardIndexes.dictionary());
+      int length = columnMetadata.getCardinality();
+      switch (dataType) {
+        case INT:
+          try (IntDictionary intDictionary = new 
IntDictionary(dictionaryBuffer, length)) {
+            
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName,
+                intDictionary.getStringValue(0), 
intDictionary.getStringValue(length - 1));
+          }
+          break;
+        case LONG:
+          try (LongDictionary longDictionary = new 
LongDictionary(dictionaryBuffer, length)) {
+            
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName,
+                longDictionary.getStringValue(0), 
longDictionary.getStringValue(length - 1));
+          }
+          break;
+        case FLOAT:
+          try (FloatDictionary floatDictionary = new 
FloatDictionary(dictionaryBuffer, length)) {
+            
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName,
+                floatDictionary.getStringValue(0), 
floatDictionary.getStringValue(length - 1));
+          }
+          break;
+        case DOUBLE:
+          try (DoubleDictionary doubleDictionary = new 
DoubleDictionary(dictionaryBuffer, length)) {
+            
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName,
+                doubleDictionary.getStringValue(0), 
doubleDictionary.getStringValue(length - 1));
+          }
+          break;
+        case STRING:
+          try (StringDictionary stringDictionary = new 
StringDictionary(dictionaryBuffer, length,
+              columnMetadata.getColumnMaxLength())) {
+            
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName,
+                stringDictionary.getStringValue(0), 
stringDictionary.getStringValue(length - 1));
+          }
+          break;
+        case BYTES:
+          try (BytesDictionary bytesDictionary = new 
BytesDictionary(dictionaryBuffer, length,
+              columnMetadata.getColumnMaxLength())) {
+            
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName,
+                bytesDictionary.getStringValue(0), 
bytesDictionary.getStringValue(length - 1));
+          }
+          break;
+        default:
+          throw new IllegalStateException("Unsupported data type: " + dataType 
+ " for column: " + columnName);
+      }
+    } else {
+      // setting min/max for non-dictionary columns.
+      int numDocs = columnMetadata.getTotalDocs();
+      boolean isSingleValueField = 
_segmentMetadata.getSchema().getFieldSpecFor(columnName).isSingleValueField();
+      PinotDataBuffer forwardBuffer = _segmentWriter.getIndexFor(columnName, 
StandardIndexes.forward());
+      switch (dataType) {
+        case INT: {
+          int min = Integer.MAX_VALUE;
+          int max = Integer.MIN_VALUE;
+          if (isSingleValueField) {
+            try (FixedByteChunkSVForwardIndexReader rawIndexReader = new 
FixedByteChunkSVForwardIndexReader(
+                forwardBuffer, DataType.INT); ChunkReaderContext readerContext 
= rawIndexReader.createContext()) {
+                for (int docId = 0; docId < numDocs; docId++) {
+                  int value = rawIndexReader.getInt(docId, readerContext);
+                  min = Math.min(min, value);
+                  max = Math.max(max, value);
+                }
+            }
+          } else {
+            try (FixedByteChunkMVForwardIndexReader rawIndexReader = new 
FixedByteChunkMVForwardIndexReader(
+                forwardBuffer, DataType.INT); ChunkReaderContext readerContext 
= rawIndexReader.createContext()) {
+                for (int docId = 0; docId < numDocs; docId++) {
+                  int[] value = rawIndexReader.getIntMV(docId, readerContext);
+                  for (int i = 0; i < value.length; i++) {
+                    min = Math.min(min, value[i]);
+                    max = Math.max(max, value[i]);
+                  }
+                }
+            }
+          }
           
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName,
-              longDictionary.getStringValue(0), 
longDictionary.getStringValue(length - 1));
-        }
-        break;
-      case FLOAT:
-        try (FloatDictionary floatDictionary = new 
FloatDictionary(dictionaryBuffer, length)) {
+              String.valueOf(min), String.valueOf(max));
+         }
+         break;
+        case LONG: {
+          long min = Long.MAX_VALUE;
+          long max = Long.MIN_VALUE;
+          if (isSingleValueField) {
+            try (FixedByteChunkSVForwardIndexReader rawIndexReader = new 
FixedByteChunkSVForwardIndexReader(
+                forwardBuffer, DataType.LONG); ChunkReaderContext 
readerContext = rawIndexReader.createContext()) {
+                for (int docId = 0; docId < numDocs; docId++) {
+                  long value = rawIndexReader.getLong(docId, readerContext);
+                  min = Math.min(min, value);
+                  max = Math.max(max, value);
+                }
+            }
+          } else {
+            try (FixedByteChunkMVForwardIndexReader rawIndexReader = new 
FixedByteChunkMVForwardIndexReader(
+                forwardBuffer, DataType.LONG); ChunkReaderContext 
readerContext = rawIndexReader.createContext()) {
+                for (int docId = 0; docId < numDocs; docId++) {
+                  long[] value = rawIndexReader.getLongMV(docId, 
readerContext);
+                  for (int i = 0; i < value.length; i++) {
+                    min = Math.min(min, value[i]);
+                    max = Math.max(max, value[i]);
+                  }
+                }
+            }
+          }
           
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName,
-              floatDictionary.getStringValue(0), 
floatDictionary.getStringValue(length - 1));
-        }
-        break;
-      case DOUBLE:
-        try (DoubleDictionary doubleDictionary = new 
DoubleDictionary(dictionaryBuffer, length)) {
+                String.valueOf(min), String.valueOf(max));
+         }
+         break;
+        case FLOAT: {
+          float min = Float.MAX_VALUE;
+          float max = Float.MIN_VALUE;
+          if (isSingleValueField) {
+            try (FixedByteChunkSVForwardIndexReader rawIndexReader = new 
FixedByteChunkSVForwardIndexReader(
+                forwardBuffer, DataType.FLOAT); ChunkReaderContext 
readerContext = rawIndexReader.createContext()) {
+                for (int docId = 0; docId < numDocs; docId++) {
+                  float value = rawIndexReader.getFloat(docId, readerContext);
+                  min = Math.min(min, value);
+                  max = Math.max(max, value);
+                }
+            }
+          } else {
+            try (FixedByteChunkMVForwardIndexReader rawIndexReader = new 
FixedByteChunkMVForwardIndexReader(
+                forwardBuffer, DataType.FLOAT); ChunkReaderContext 
readerContext = rawIndexReader.createContext()) {
+                for (int docId = 0; docId < numDocs; docId++) {
+                  float[] value = rawIndexReader.getFloatMV(docId, 
readerContext);
+                  for (int i = 0; i < value.length; i++) {
+                    min = Math.min(min, value[i]);
+                    max = Math.max(max, value[i]);
+                  }
+                }
+            }
+          }
           
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName,
-              doubleDictionary.getStringValue(0), 
doubleDictionary.getStringValue(length - 1));
-        }
-        break;
-      case STRING:
-        try (StringDictionary stringDictionary = new 
StringDictionary(dictionaryBuffer, length,
-            columnMetadata.getColumnMaxLength())) {
+                String.valueOf(min), String.valueOf(max));
+         }
+         break;
+        case DOUBLE: {
+          double min = Double.MAX_VALUE;
+          double max = Double.MIN_VALUE;
+          if (isSingleValueField) {
+            try (FixedByteChunkSVForwardIndexReader rawIndexReader = new 
FixedByteChunkSVForwardIndexReader(
+                forwardBuffer, DataType.DOUBLE); ChunkReaderContext 
readerContext = rawIndexReader.createContext()) {
+                for (int docId = 0; docId < numDocs; docId++) {
+                  double value = rawIndexReader.getDouble(docId, 
readerContext);
+                  min = Math.min(min, value);
+                  max = Math.max(max, value);
+                }
+            }
+          } else {
+            try (FixedByteChunkMVForwardIndexReader rawIndexReader = new 
FixedByteChunkMVForwardIndexReader(
+                forwardBuffer, DataType.DOUBLE); ChunkReaderContext 
readerContext = rawIndexReader.createContext()) {
+                for (int docId = 0; docId < numDocs; docId++) {
+                  double[] value = rawIndexReader.getDoubleMV(docId, 
readerContext);
+                  for (int i = 0; i < value.length; i++) {
+                    min = Math.min(min, value[i]);
+                    max = Math.max(max, value[i]);
+                  }
+                }
+            }
+          }
           
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName,
-              stringDictionary.getStringValue(0), 
stringDictionary.getStringValue(length - 1));
-        }
-        break;
-      case BYTES:
-        try (BytesDictionary bytesDictionary = new 
BytesDictionary(dictionaryBuffer, length,
-            columnMetadata.getColumnMaxLength())) {
+                String.valueOf(min), String.valueOf(max));
+          }
+          break;
+        case STRING: {
+          String min = null;
+          String max = null;
+          if (isSingleValueField) {
+            try (VarByteChunkSVForwardIndexReader rawIndexReader = new 
VarByteChunkSVForwardIndexReader(forwardBuffer,
+                DataType.STRING); ChunkReaderContext readerContext = 
rawIndexReader.createContext()) {
+                for (int docId = 0; docId < numDocs; docId++) {
+                  String value = rawIndexReader.getString(docId, 
readerContext);
+                  if (min == null || StringUtils.compare(min, value) > 0) {
+                    min = value;
+                  }
+                  if (max == null || StringUtils.compare(max, value) < 0) {
+                    max = value;
+                  }
+                }
+            }
+          } else {
+            try (VarByteChunkMVForwardIndexReader rawIndexReader = new 
VarByteChunkMVForwardIndexReader(forwardBuffer,
+                DataType.STRING); ChunkReaderContext readerContext = 
rawIndexReader.createContext()) {
+                for (int docId = 0; docId < numDocs; docId++) {
+                  String[] value = rawIndexReader.getStringMV(docId, 
readerContext);
+                  for (int i = 0; i < value.length; i++) {
+                    if (min == null || StringUtils.compare(min, value[i]) > 0) 
{
+                      min = value[i];
+                    }
+                    if (max == null || StringUtils.compare(max, value[i]) < 0) 
{
+                      max = value[i];
+                    }
+                  }
+                }
+            }
+          }
+          
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName, min, max);
+          }
+          break;
+        case BYTES: {
+          byte[] min = null;
+          byte[] max = null;
+          if (isSingleValueField) {
+            try (VarByteChunkSVForwardIndexReader rawIndexReader = new 
VarByteChunkSVForwardIndexReader(forwardBuffer,
+                DataType.BYTES); ChunkReaderContext readerContext = 
rawIndexReader.createContext()) {
+                for (int docId = 0; docId < numDocs; docId++) {
+                  byte[] value = rawIndexReader.getBytes(docId, readerContext);
+                  if (min == null || ByteArray.compare(value, min) > 0) {
+                    min = value;
+                  }
+                  if (max == null || ByteArray.compare(value, max) < 0) {
+                    max = value;
+                  }
+                }
+            }
+          } else {
+            try (VarByteChunkMVForwardIndexReader rawIndexReader = new 
VarByteChunkMVForwardIndexReader(forwardBuffer,
+                DataType.BYTES); ChunkReaderContext readerContext = 
rawIndexReader.createContext()) {
+                for (int docId = 0; docId < numDocs; docId++) {
+                  byte[][] value = rawIndexReader.getBytesMV(docId, 
readerContext);
+                  for (int i = 0; i < value.length; i++) {
+                    if (min == null || ByteArray.compare(value[i], min) > 0) {
+                      min = value[i];
+                    }
+                    if (max == null || ByteArray.compare(value[i], max) < 0) {
+                      max = value[i];
+                    }
+                  }
+                }
+            }
+          }
           
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName,
-              bytesDictionary.getStringValue(0), 
bytesDictionary.getStringValue(length - 1));
-        }
-        break;
-      default:
-        throw new IllegalStateException("Unsupported data type: " + dataType + 
" for column: " + columnName);
+              String.valueOf(new ByteArray(min)), String.valueOf(new 
ByteArray(max)));
+          }
+          break;
+        default:
+          throw new IllegalStateException("Unsupported data type: " + dataType 
+ " for column: " + columnName);
+      }
     }
-
     _minMaxValueAdded = true;
   }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index 16d265c04d..0bdaefca0a 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -1755,13 +1755,8 @@ public class SegmentPreProcessorTest {
     }
     segmentMetadata = new SegmentMetadataImpl(_indexDir);
     segmentMetadata.getColumnMetadataMap().forEach((k, v) -> {
-      if (v.hasDictionary()) {
-        assertNotNull(v.getMinValue(), "checking column: " + k);
-        assertNotNull(v.getMaxValue(), "checking column: " + k);
-      } else {
-        assertNull(v.getMinValue(), "checking column: " + k);
-        assertNull(v.getMaxValue(), "checking column: " + k);
-      }
+      assertNotNull(v.getMinValue(), "checking column: " + k);
+      assertNotNull(v.getMaxValue(), "checking column: " + k);
     });
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to