This is an automated email from the ASF dual-hosted git repository. kishoreg pushed a commit to branch mv-fwd-index in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/mv-fwd-index by this push: new e8bca30 Wiring in the segment creation driver Impl e8bca30 is described below commit e8bca30473c28e9e9efe371bd06442955da7d433 Author: kishoreg <g.kish...@gmail.com> AuthorDate: Sun Oct 17 20:46:09 2021 -0700 Wiring in the segment creation driver Impl --- .../apache/pinot/common/utils/PinotDataType.java | 47 ++- .../pinot/core/minion/RawIndexConverter.java | 2 +- .../tests/BaseClusterIntegrationTest.java | 3 +- .../impl/VarByteChunkSVForwardIndexWriter.java | 43 ++- .../creator/impl/SegmentColumnarIndexCreator.java | 422 ++++++++++++++++----- .../fwd/MultiValueFixedByteRawIndexCreator.java | 22 +- .../impl/fwd/MultiValueVarByteRawIndexCreator.java | 23 +- .../stats/BytesColumnPredIndexStatsCollector.java | 56 ++- .../local/segment/store/FilePerIndexDirectory.java | 7 +- .../MultiValueVarByteRawIndexCreatorTest.java | 45 ++- .../segment/index/creator/RawIndexCreatorTest.java | 147 +++++-- .../converter/DictionaryToRawIndexConverter.java | 2 +- 12 files changed, 627 insertions(+), 192 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java index 49406ba..a94fee9 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -29,7 +29,6 @@ import org.apache.pinot.spi.utils.BytesUtils; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.TimestampUtils; - /** * The <code>PinotDataType</code> enum represents the data type of a value in a row from recordReader and provides * utility methods to convert value across types if applicable. @@ -583,7 +582,8 @@ public enum PinotDataType { try { return Base64.getDecoder().decode(value.toString()); } catch (Exception e) { - throw new RuntimeException("Unable to convert JSON base64 encoded string value to BYTES. Input value: " + value, + throw new RuntimeException( + "Unable to convert JSON base64 encoded string value to BYTES. Input value: " + value, e); } } @@ -769,6 +769,12 @@ public enum PinotDataType { return sourceType.toStringArray(value); } }, + BYTES_ARRAY { + @Override + public byte[][] convert(Object value, PinotDataType sourceType) { + return sourceType.toBytesArray(value); + } + }, OBJECT_ARRAY; @@ -817,7 +823,8 @@ public enum PinotDataType { return JsonUtils.objectToString(value); } catch (Exception e) { throw new RuntimeException( - "Unable to convert " + value.getClass().getCanonicalName() + " to JSON. Input value: " + value, e); + "Unable to convert " + value.getClass().getCanonicalName() + " to JSON. Input value: " + + value, e); } } } @@ -1020,6 +1027,24 @@ public enum PinotDataType { } } + public byte[][] toBytesArray(Object value) { + if (value instanceof byte[][]) { + return (byte[][]) value; + } + if (isSingleValue()) { + return new byte[][]{toBytes(value)}; + } else { + Object[] valueArray = toObjectArray(value); + int length = valueArray.length; + byte[][] bytesArray = new byte[length][]; + PinotDataType singleValueType = getSingleValueType(); + for (int i = 0; i < length; i++) { + bytesArray[i] = singleValueType.toBytes(valueArray[i]); + } + return bytesArray; + } + } + private static Object[] toObjectArray(Object array) { Class<?> componentType = array.getClass().getComponentType(); if (componentType.isPrimitive()) { @@ -1042,7 +1067,8 @@ public enum PinotDataType { } public Object convert(Object value, PinotDataType sourceType) { - throw new UnsupportedOperationException("Cannot convert value from " + sourceType + " to " + this); + throw new UnsupportedOperationException( + "Cannot convert value from " + sourceType + " to " + this); } /** @@ -1082,6 +1108,8 @@ public enum PinotDataType { return DOUBLE; case STRING_ARRAY: return STRING; + case BYTES_ARRAY: + return BYTES; case OBJECT_ARRAY: return OBJECT; default: @@ -1151,6 +1179,9 @@ public enum PinotDataType { if (cls == Short.class) { return SHORT_ARRAY; } + if (cls == byte[].class) { + return BYTES_ARRAY; + } return OBJECT_ARRAY; } @@ -1210,7 +1241,8 @@ public enum PinotDataType { if (fieldSpec.isSingleValueField()) { return BYTES; } else { - throw new IllegalStateException("There is no multi-value type for BYTES"); + return BYTES_ARRAY; +// throw new IllegalStateException("There is no multi-value type for BYTES"); } default: throw new UnsupportedOperationException( @@ -1253,7 +1285,8 @@ public enum PinotDataType { case STRING_ARRAY: return STRING_ARRAY; default: - throw new IllegalStateException("Cannot convert ColumnDataType: " + columnDataType + " to PinotDataType"); + throw new IllegalStateException( + "Cannot convert ColumnDataType: " + columnDataType + " to PinotDataType"); } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java index f1c7fbb..a3b2e24 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java @@ -207,7 +207,7 @@ public class RawIndexConverter { int numDocs = _originalSegmentMetadata.getTotalDocs(); int lengthOfLongestEntry = _originalSegmentMetadata.getColumnMetadataFor(columnName).getColumnMaxLength(); try (ForwardIndexCreator rawIndexCreator = SegmentColumnarIndexCreator - .getRawIndexCreatorForColumn(_convertedIndexDir, ChunkCompressionType.SNAPPY, columnName, storedType, numDocs, + .getRawIndexCreatorForSVColumn(_convertedIndexDir, ChunkCompressionType.SNAPPY, columnName, storedType, numDocs, lengthOfLongestEntry, false, BaseChunkSVForwardIndexWriter.DEFAULT_VERSION); ForwardIndexReaderContext readerContext = reader.createContext()) { switch (storedType) { diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java index ab5e028..ae36cc6 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java @@ -81,11 +81,12 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { protected static final int DEFAULT_HLC_NUM_KAFKA_PARTITIONS = 10; protected static final int DEFAULT_MAX_NUM_KAFKA_MESSAGES_PER_BATCH = 10000; protected static final List<String> DEFAULT_NO_DICTIONARY_COLUMNS = - Arrays.asList("ActualElapsedTime", "ArrDelay", "DepDelay", "CRSDepTime"); + Arrays.asList("ActualElapsedTime", "ArrDelay", "DepDelay", "CRSDepTime", "RandomAirports"); protected static final String DEFAULT_SORTED_COLUMN = "Carrier"; protected static final List<String> DEFAULT_INVERTED_INDEX_COLUMNS = Arrays.asList("FlightNum", "Origin", "Quarter"); private static final List<String> DEFAULT_BLOOM_FILTER_COLUMNS = Arrays.asList("FlightNum", "Origin"); private static final List<String> DEFAULT_RANGE_INDEX_COLUMNS = Collections.singletonList("Origin"); + private static final List<String> DEFAULT_TEXT_INDEX_COLUMNS = Collections.singletonList("RandomAirports"); protected static final int DEFAULT_NUM_REPLICAS = 1; protected static final boolean DEFAULT_NULL_HANDLING_ENABLED = false; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java index 4444512..44ea807 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -25,34 +25,35 @@ import javax.annotation.concurrent.NotThreadSafe; import org.apache.pinot.common.utils.StringUtil; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; - /** * Class to write out variable length bytes into a single column. * * The layout of the file is as follows: * <p> Header Section: </p> * <ul> - * <li> Integer: File format version. </li> - * <li> Integer: Total number of chunks. </li> - * <li> Integer: Number of docs per chunk. </li> - * <li> Integer: Length of longest entry (in bytes). </li> - * <li> Integer: Total number of docs (version 2 onwards). </li> - * <li> Integer: Compression type enum value (version 2 onwards). </li> - * <li> Integer: Start offset of data header (version 2 onwards). </li> - * <li> Integer array: Integer offsets for all chunks in the data (upto version 2), - * Long array: Long offsets for all chunks in the data (version 3 onwards) </li> + * <li> Integer: File format version. </li> + * <li> Integer: Total number of chunks. </li> + * <li> Integer: Number of docs per chunk. </li> + * <li> Integer: Length of longest entry (in bytes). </li> + * <li> Integer: Total number of docs (version 2 onwards). </li> + * <li> Integer: Compression type enum value (version 2 onwards). </li> + * <li> Integer: Start offset of data header (version 2 onwards). </li> + * <li> Integer array: Integer offsets for all chunks in the data (upto version 2), + * Long array: Long offsets for all chunks in the data (version 3 onwards) </li> * </ul> * * <p> Individual Chunks: </p> * <ul> - * <li> Integer offsets to start position of rows: For partial chunks, offset values are 0 for missing rows. </li> - * <li> Data bytes. </li> + * <li> Integer offsets to start position of rows: For partial chunks, offset values are 0 for + * missing rows. </li> + * <li> Data bytes. </li> * </ul> * * Only sequential writes are supported. */ @NotThreadSafe public class VarByteChunkSVForwardIndexWriter extends BaseChunkSVForwardIndexWriter { + public static final int CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE = Integer.BYTES; private final int _chunkHeaderSize; @@ -68,9 +69,11 @@ public class VarByteChunkSVForwardIndexWriter extends BaseChunkSVForwardIndexWri * @param numDocsPerChunk Number of documents per chunk. * @param lengthOfLongestEntry Length of longest entry (in bytes) * @param writerVersion writer format version - * @throws FileNotFoundException Throws {@link FileNotFoundException} if the specified file is not found. + * @throws FileNotFoundException Throws {@link FileNotFoundException} if the specified file is + * not found. */ - public VarByteChunkSVForwardIndexWriter(File file, ChunkCompressionType compressionType, int totalDocs, + public VarByteChunkSVForwardIndexWriter(File file, ChunkCompressionType compressionType, + int totalDocs, int numDocsPerChunk, int lengthOfLongestEntry, int writerVersion) throws FileNotFoundException { super(file, compressionType, totalDocs, numDocsPerChunk, @@ -119,12 +122,12 @@ public class VarByteChunkSVForwardIndexWriter extends BaseChunkSVForwardIndexWri /** * Helper method to compress and write the current chunk. * <ul> - * <li> Chunk header is of fixed size, so fills out any remaining offsets for partially filled chunks. </li> - * <li> Compresses and writes the chunk to the data file. </li> - * <li> Updates the header with the current chunks offset. </li> - * <li> Clears up the buffers, so that they can be reused. </li> + * <li> Chunk header is of fixed size, so fills out any remaining offsets for partially filled + * chunks. </li> + * <li> Compresses and writes the chunk to the data file. </li> + * <li> Updates the header with the current chunks offset. </li> + * <li> Clears up the buffers, so that they can be reused. </li> * </ul> - * */ protected void writeChunk() { // For partially filled chunks, we still need to clear the offsets for remaining rows, as we reuse this buffer. diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java index 2829e8e..9d6b32b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -23,6 +23,8 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import java.io.File; import java.io.IOException; +import java.io.StringWriter; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -34,7 +36,9 @@ import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.pinot.common.utils.FileUtils; import org.apache.pinot.segment.local.io.util.PinotDataBitSet; import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator; import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator; import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueFixedByteRawIndexCreator; import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator; import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator; @@ -82,12 +86,12 @@ import org.slf4j.LoggerFactory; import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.*; import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Segment.*; - /** * Segment creator which writes data in a columnar form. */ // TODO: check resource leaks public class SegmentColumnarIndexCreator implements SegmentCreator { + // TODO Refactor class name to match interface name private static final Logger LOGGER = LoggerFactory.getLogger(SegmentColumnarIndexCreator.class); // Allow at most 512 characters for the metadata property @@ -112,7 +116,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { private Map<String, Map<String, String>> _columnProperties; @Override - public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreationInfo segmentIndexCreationInfo, + public void init(SegmentGeneratorConfig segmentCreationSpec, + SegmentIndexCreationInfo segmentIndexCreationInfo, Map<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir) throws Exception { _docIdCounter = 0; @@ -121,7 +126,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { _columnProperties = segmentCreationSpec.getColumnProperties(); // Check that the output directory does not exist - Preconditions.checkState(!outDir.exists(), "Segment output directory: %s already exists", outDir); + Preconditions + .checkState(!outDir.exists(), "Segment output directory: %s already exists", outDir); Preconditions.checkState(outDir.mkdirs(), "Failed to create output directory: %s", outDir); _indexDir = outDir; @@ -164,7 +170,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { Map<String, H3IndexConfig> h3IndexConfigs = _config.getH3IndexConfigs(); for (String columnName : h3IndexConfigs.keySet()) { Preconditions - .checkState(schema.hasColumn(columnName), "Cannot create H3 index for column: %s because it is not in schema", + .checkState(schema.hasColumn(columnName), + "Cannot create H3 index for column: %s because it is not in schema", columnName); } @@ -178,15 +185,18 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { String columnName = fieldSpec.getName(); DataType storedType = fieldSpec.getDataType().getStoredType(); ColumnIndexCreationInfo indexCreationInfo = indexCreationInfoMap.get(columnName); - Preconditions.checkNotNull(indexCreationInfo, "Missing index creation info for column: %s", columnName); - boolean dictEnabledColumn = createDictionaryForColumn(indexCreationInfo, segmentCreationSpec, fieldSpec); + Preconditions.checkNotNull(indexCreationInfo, "Missing index creation info for column: %s", + columnName); + boolean dictEnabledColumn = createDictionaryForColumn(indexCreationInfo, segmentCreationSpec, + fieldSpec); if (dictEnabledColumn) { // Create dictionary-encoded index // Initialize dictionary creator SegmentDictionaryCreator dictionaryCreator = - new SegmentDictionaryCreator(indexCreationInfo.getSortedUniqueElementsArray(), fieldSpec, _indexDir, + new SegmentDictionaryCreator(indexCreationInfo.getSortedUniqueElementsArray(), + fieldSpec, _indexDir, indexCreationInfo.isUseVarLengthDictionary()); _dictionaryCreatorMap.put(columnName, dictionaryCreator); @@ -194,8 +204,10 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { try { dictionaryCreator.build(); } catch (Exception e) { - LOGGER.error("Error building dictionary for field: {}, cardinality: {}, number of bytes per entry: {}", - fieldSpec.getName(), indexCreationInfo.getDistinctValueCount(), dictionaryCreator.getNumBytesPerEntry()); + LOGGER.error( + "Error building dictionary for field: {}, cardinality: {}, number of bytes per entry: {}", + fieldSpec.getName(), indexCreationInfo.getDistinctValueCount(), + dictionaryCreator.getNumBytesPerEntry()); throw e; } @@ -204,14 +216,17 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { if (fieldSpec.isSingleValueField()) { if (indexCreationInfo.isSorted()) { _forwardIndexCreatorMap - .put(columnName, new SingleValueSortedForwardIndexCreator(_indexDir, columnName, cardinality)); + .put(columnName, + new SingleValueSortedForwardIndexCreator(_indexDir, columnName, cardinality)); } else { _forwardIndexCreatorMap.put(columnName, - new SingleValueUnsortedForwardIndexCreator(_indexDir, columnName, cardinality, _totalDocs)); + new SingleValueUnsortedForwardIndexCreator(_indexDir, columnName, cardinality, + _totalDocs)); } } else { _forwardIndexCreatorMap.put(columnName, - new MultiValueUnsortedForwardIndexCreator(_indexDir, columnName, cardinality, _totalDocs, + new MultiValueUnsortedForwardIndexCreator(_indexDir, columnName, cardinality, + _totalDocs, indexCreationInfo.getTotalNumberOfEntries())); } @@ -219,7 +234,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { if (invertedIndexColumns.contains(columnName) && !indexCreationInfo.isSorted()) { if (segmentCreationSpec.isOnHeap()) { _invertedIndexCreatorMap - .put(columnName, new OnHeapBitmapInvertedIndexCreator(_indexDir, columnName, cardinality)); + .put(columnName, + new OnHeapBitmapInvertedIndexCreator(_indexDir, columnName, cardinality)); } else { _invertedIndexCreatorMap.put(columnName, new OffHeapBitmapInvertedIndexCreator(_indexDir, fieldSpec, cardinality, _totalDocs, @@ -230,20 +246,33 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { // Create raw index // TODO: add support to multi-value column and inverted index - Preconditions.checkState(fieldSpec.isSingleValueField(), "Cannot create raw index for multi-value column: %s", - columnName); +// Preconditions.checkState(fieldSpec.isSingleValueField(), "Cannot create raw index for multi-value column: %s", +// columnName); Preconditions.checkState(!invertedIndexColumns.contains(columnName), "Cannot create inverted index for raw index column: %s", columnName); - ChunkCompressionType compressionType = getColumnCompressionType(segmentCreationSpec, fieldSpec); + ChunkCompressionType compressionType = getColumnCompressionType(segmentCreationSpec, + fieldSpec); // Initialize forward index creator boolean deriveNumDocsPerChunk = shouldDeriveNumDocsPerChunk(columnName, segmentCreationSpec.getColumnProperties()); - int writerVersion = rawIndexWriterVersion(columnName, segmentCreationSpec.getColumnProperties()); - _forwardIndexCreatorMap.put(columnName, - getRawIndexCreatorForColumn(_indexDir, compressionType, columnName, storedType, _totalDocs, - indexCreationInfo.getLengthOfLongestEntry(), deriveNumDocsPerChunk, writerVersion)); + int writerVersion = rawIndexWriterVersion(columnName, + segmentCreationSpec.getColumnProperties()); + if (fieldSpec.isSingleValueField()) { + _forwardIndexCreatorMap.put(columnName, + getRawIndexCreatorForSVColumn(_indexDir, compressionType, columnName, storedType, + _totalDocs, + indexCreationInfo.getLengthOfLongestEntry(), deriveNumDocsPerChunk, + writerVersion)); + } else { + _forwardIndexCreatorMap.put(columnName, + getRawIndexCreatorForMVColumn(_indexDir, compressionType, columnName, storedType, + _totalDocs, + indexCreationInfo.getLengthOfLongestEntry(), + indexCreationInfo.getMaxNumberOfMultiValueElements(), deriveNumDocsPerChunk, + writerVersion)); + } } if (textIndexColumns.contains(columnName)) { @@ -251,18 +280,22 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { Preconditions.checkState(fieldSpec.isSingleValueField(), "Text index is currently only supported on single-value columns"); Preconditions - .checkState(storedType == DataType.STRING, "Text index is currently only supported on STRING type columns"); + .checkState(storedType == DataType.STRING, + "Text index is currently only supported on STRING type columns"); _textIndexCreatorMap - .put(columnName, new LuceneTextIndexCreator(columnName, _indexDir, true /* commitOnClose */)); + .put(columnName, + new LuceneTextIndexCreator(columnName, _indexDir, true /* commitOnClose */)); } if (fstIndexColumns.contains(columnName)) { Preconditions.checkState(fieldSpec.isSingleValueField(), "FST index is currently only supported on single-value columns"); Preconditions - .checkState(storedType == DataType.STRING, "FST index is currently only supported on STRING type columns"); + .checkState(storedType == DataType.STRING, + "FST index is currently only supported on STRING type columns"); Preconditions - .checkState(dictEnabledColumn, "FST index is currently only supported on dictionary-encoded columns"); + .checkState(dictEnabledColumn, + "FST index is currently only supported on dictionary-encoded columns"); _fstIndexCreatorMap.put(columnName, new LuceneFSTIndexCreator(_indexDir, columnName, (String[]) indexCreationInfo.getSortedUniqueElementsArray())); } @@ -271,7 +304,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { Preconditions.checkState(fieldSpec.isSingleValueField(), "Json index is currently only supported on single-value columns"); Preconditions - .checkState(storedType == DataType.STRING, "Json index is currently only supported on STRING columns"); + .checkState(storedType == DataType.STRING, + "Json index is currently only supported on STRING columns"); JsonIndexCreator jsonIndexCreator = segmentCreationSpec.isOnHeap() ? new OnHeapJsonIndexCreator(_indexDir, columnName) : new OffHeapJsonIndexCreator(_indexDir, columnName); @@ -281,11 +315,14 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { H3IndexConfig h3IndexConfig = h3IndexConfigs.get(columnName); if (h3IndexConfig != null) { Preconditions - .checkState(fieldSpec.isSingleValueField(), "H3 index is currently only supported on single-value columns"); - Preconditions.checkState(storedType == DataType.BYTES, "H3 index is currently only supported on BYTES columns"); + .checkState(fieldSpec.isSingleValueField(), + "H3 index is currently only supported on single-value columns"); + Preconditions.checkState(storedType == DataType.BYTES, + "H3 index is currently only supported on BYTES columns"); H3IndexResolution resolution = h3IndexConfig.getResolution(); GeoSpatialIndexCreator h3IndexCreator = - segmentCreationSpec.isOnHeap() ? new OnHeapH3IndexCreator(_indexDir, columnName, resolution) + segmentCreationSpec.isOnHeap() ? new OnHeapH3IndexCreator(_indexDir, columnName, + resolution) : new OffHeapH3IndexCreator(_indexDir, columnName, resolution); _h3IndexCreatorMap.put(columnName, h3IndexCreator); } @@ -293,7 +330,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { _nullHandlingEnabled = _config.isNullHandlingEnabled(); if (_nullHandlingEnabled) { // Initialize Null value vector map - _nullValueVectorCreatorMap.put(columnName, new NullValueVectorCreator(_indexDir, columnName)); + _nullValueVectorCreatorMap + .put(columnName, new NullValueVectorCreator(_indexDir, columnName)); } } } @@ -308,7 +346,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { return false; } - public static int rawIndexWriterVersion(String columnName, Map<String, Map<String, String>> columnProperties) { + public static int rawIndexWriterVersion(String columnName, + Map<String, Map<String, String>> columnProperties) { if (columnProperties != null && columnProperties.get(columnName) != null) { Map<String, String> properties = columnProperties.get(columnName); String version = properties.get(FieldConfig.RAW_INDEX_WRITER_VERSION); @@ -321,20 +360,26 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { } /** - * Helper method that returns compression type to use based on segment creation spec and field type. + * Helper method that returns compression type to use based on segment creation spec and field + * type. * <ul> - * <li> Returns compression type from segment creation spec, if specified there.</li> - * <li> Else, returns PASS_THROUGH for metrics, and SNAPPY for dimensions. This is because metrics are likely - * to be spread in different chunks after applying predicates. Same could be true for dimensions, but in that - * case, clients are expected to explicitly specify the appropriate compression type in the spec. </li> + * <li> Returns compression type from segment creation spec, if specified there.</li> + * <li> Else, returns PASS_THROUGH for metrics, and SNAPPY for dimensions. This is because metrics + * are likely + * to be spread in different chunks after applying predicates. Same could be true for dimensions, + * but in that + * case, clients are expected to explicitly specify the appropriate compression type in the spec. + * </li> * </ul> + * * @param segmentCreationSpec Segment creation spec * @param fieldSpec Field spec for the column * @return Compression type to use */ private ChunkCompressionType getColumnCompressionType(SegmentGeneratorConfig segmentCreationSpec, FieldSpec fieldSpec) { - ChunkCompressionType compressionType = segmentCreationSpec.getRawIndexCompressionType().get(fieldSpec.getName()); + ChunkCompressionType compressionType = segmentCreationSpec.getRawIndexCompressionType() + .get(fieldSpec.getName()); if (compressionType == null) { if (fieldSpec.getFieldType() == FieldType.METRIC) { return ChunkCompressionType.PASS_THROUGH; @@ -350,8 +395,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { * Returns true if dictionary should be created for a column, false otherwise. * Currently there are two sources for this config: * <ul> - * <li> ColumnIndexCreationInfo (this is currently hard-coded to always return dictionary). </li> - * <li> SegmentGeneratorConfig</li> + * <li> ColumnIndexCreationInfo (this is currently hard-coded to always return dictionary). </li> + * <li> SegmentGeneratorConfig</li> * </ul> * * This method gives preference to the SegmentGeneratorConfig first. @@ -361,15 +406,16 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { * @param spec Field spec for the column * @return True if dictionary should be created for the column, false otherwise */ - private boolean createDictionaryForColumn(ColumnIndexCreationInfo info, SegmentGeneratorConfig config, + private boolean createDictionaryForColumn(ColumnIndexCreationInfo info, + SegmentGeneratorConfig config, FieldSpec spec) { String column = spec.getName(); if (config.getRawIndexCreationColumns().contains(column) || config.getRawIndexCompressionType() .containsKey(column)) { - if (!spec.isSingleValueField()) { - throw new RuntimeException( - "Creation of indices without dictionaries is supported for single valued columns only."); - } +// if (!spec.isSingleValueField()) { +// throw new RuntimeException( +// "Creation of indices without dictionaries is supported for single valued columns only."); +// } return false; } return info.isCreateDictionary(); @@ -387,16 +433,19 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { throw new RuntimeException("Null value for column:" + columnName); } - boolean isSingleValue = _schema.getFieldSpecFor(columnName).isSingleValueField(); + FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName); + + //get dictionaryCreator, will be null if column is not dictionaryEncoded SegmentDictionaryCreator dictionaryCreator = _dictionaryCreatorMap.get(columnName); - if (isSingleValue) { - // SV column - // text-index enabled SV column - TextIndexCreator textIndexCreator = _textIndexCreatorMap.get(columnName); - if (textIndexCreator != null) { - textIndexCreator.add((String) columnValueToIndex); - } + // text-index + TextIndexCreator textIndexCreator = _textIndexCreatorMap.get(columnName); + if (textIndexCreator != null) { + textIndexCreator.add((String) columnValueToIndex); + } + + if (fieldSpec.isSingleValueField()) { + // Single Value column JsonIndexCreator jsonIndexCreator = _jsonIndexCreatorMap.get(columnName); if (jsonIndexCreator != null) { jsonIndexCreator.add((String) columnValueToIndex); @@ -411,7 +460,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { int dictId = dictionaryCreator.indexOfSV(columnValueToIndex); // store the docID -> dictID mapping in forward index forwardIndexCreator.putDictId(dictId); - DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap.get(columnName); + DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap + .get(columnName); if (invertedIndexCreator != null) { // if inverted index enabled during segment creation, // then store dictID -> docID mapping in inverted index @@ -423,7 +473,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { if (textIndexCreator != null && !shouldStoreRawValueForTextIndex(columnName)) { // for text index on raw columns, check the config to determine if actual raw value should // be stored or not - columnValueToIndex = _columnProperties.get(columnName).get(FieldConfig.TEXT_INDEX_RAW_VALUE); + columnValueToIndex = _columnProperties.get(columnName) + .get(FieldConfig.TEXT_INDEX_RAW_VALUE); if (columnValueToIndex == null) { columnValueToIndex = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE; } @@ -452,12 +503,120 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { } } } else { - // MV column (always dictionary encoded) - int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex); - forwardIndexCreator.putDictIdMV(dictIds); - DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap.get(columnName); - if (invertedIndexCreator != null) { - invertedIndexCreator.add(dictIds, dictIds.length); + if (dictionaryCreator != null) { + //dictionary encoded + int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex); + forwardIndexCreator.putDictIdMV(dictIds); + DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap + .get(columnName); + if (invertedIndexCreator != null) { + invertedIndexCreator.add(dictIds, dictIds.length); + } + } else { + // for text index on raw columns, check the config to determine if actual raw value should + // be stored or not + if (textIndexCreator != null && !shouldStoreRawValueForTextIndex(columnName)) { + Object value = _columnProperties.get(columnName) + .get(FieldConfig.TEXT_INDEX_RAW_VALUE); + if (value == null) { + value = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE; + } + if (forwardIndexCreator.getValueType().getStoredType() == DataType.STRING) { + value = String.valueOf(value); + int length = ((String[]) columnValueToIndex).length; + columnValueToIndex = new String[length]; + Arrays.fill((String[]) columnValueToIndex, value); + } else if (forwardIndexCreator.getValueType().getStoredType() == DataType.BYTES) { + int length = ((byte[][]) columnValueToIndex).length; + columnValueToIndex = new byte[length][]; + Arrays.fill((byte[][]) columnValueToIndex, String.valueOf(value).getBytes()); + } else { + throw new RuntimeException( + "Text Index is only supported for STRING and BYTES stored type"); + } + } + switch (forwardIndexCreator.getValueType()) { + case INT: + if (columnValueToIndex instanceof int[]) { + forwardIndexCreator.putIntMV((int[]) columnValueToIndex); + } else if (columnValueToIndex instanceof Object[]) { + int[] array = new int[((Object[]) columnValueToIndex).length]; + for (int i = 0; i < array.length; i++) { + array[i] = (Integer) ((Object[]) columnValueToIndex)[i]; + } + forwardIndexCreator.putIntMV(array); + } else { + //TODO: is this possible? + } + break; + case LONG: + if (columnValueToIndex instanceof long[]) { + forwardIndexCreator.putLongMV((long[]) columnValueToIndex); + } else if (columnValueToIndex instanceof Object[]) { + long[] array = new long[((Object[]) columnValueToIndex).length]; + for (int i = 0; i < array.length; i++) { + array[i] = (Long) ((Object[]) columnValueToIndex)[i]; + } + forwardIndexCreator.putLongMV(array); + } else { + //TODO: is this possible? + } + break; + case FLOAT: + if (columnValueToIndex instanceof float[]) { + forwardIndexCreator.putFloatMV((float[]) columnValueToIndex); + } else if (columnValueToIndex instanceof Object[]) { + float[] array = new float[((Object[]) columnValueToIndex).length]; + for (int i = 0; i < array.length; i++) { + array[i] = (Float) ((Object[]) columnValueToIndex)[i]; + } + forwardIndexCreator.putFloatMV(array); + } else { + //TODO: is this possible? + } + break; + case DOUBLE: + if (columnValueToIndex instanceof double[]) { + forwardIndexCreator.putDoubleMV((double[]) columnValueToIndex); + } else if (columnValueToIndex instanceof Object[]) { + double[] array = new double[((Object[]) columnValueToIndex).length]; + for (int i = 0; i < array.length; i++) { + array[i] = (Double) ((Object[]) columnValueToIndex)[i]; + } + forwardIndexCreator.putDoubleMV(array); + } else { + //TODO: is this possible? + } + break; + case STRING: + if (columnValueToIndex instanceof String[]) { + forwardIndexCreator.putStringMV((String[]) columnValueToIndex); + } else if (columnValueToIndex instanceof Object[]) { + String[] array = new String[((Object[]) columnValueToIndex).length]; + for (int i = 0; i < array.length; i++) { + array[i] = (String) ((Object[]) columnValueToIndex)[i]; + } + forwardIndexCreator.putStringMV(array); + } else { + //TODO: is this possible? + } + break; + case BYTES: + if (columnValueToIndex instanceof byte[][]) { + forwardIndexCreator.putBytesMV((byte[][]) columnValueToIndex); + } else if (columnValueToIndex instanceof Object[]) { + byte[][] array = new byte[((Object[]) columnValueToIndex).length][]; + for (int i = 0; i < array.length; i++) { + array[i] = (byte[]) ((Object[]) columnValueToIndex)[i]; + } + forwardIndexCreator.putBytesMV(array); + } else { + //TODO: is this possible? + } + break; + default: + throw new IllegalStateException(); + } } } @@ -468,6 +627,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { } } } + _docIdCounter++; } @@ -491,7 +651,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { @Override public void seal() throws ConfigurationException, IOException { - for (DictionaryBasedInvertedIndexCreator invertedIndexCreator : _invertedIndexCreatorMap.values()) { + for (DictionaryBasedInvertedIndexCreator invertedIndexCreator : _invertedIndexCreatorMap + .values()) { invertedIndexCreator.seal(); } for (TextIndexCreator textIndexCreator : _textIndexCreatorMap.values()) { @@ -515,10 +676,12 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { private void writeMetadata() throws ConfigurationException { PropertiesConfiguration properties = - new PropertiesConfiguration(new File(_indexDir, V1Constants.MetadataKeys.METADATA_FILE_NAME)); + new PropertiesConfiguration( + new File(_indexDir, V1Constants.MetadataKeys.METADATA_FILE_NAME)); properties.setProperty(SEGMENT_CREATOR_VERSION, _config.getCreatorVersion()); - properties.setProperty(SEGMENT_PADDING_CHARACTER, String.valueOf(V1Constants.Str.DEFAULT_STRING_PAD_CHAR)); + properties.setProperty(SEGMENT_PADDING_CHARACTER, + String.valueOf(V1Constants.Str.DEFAULT_STRING_PAD_CHAR)); properties.setProperty(SEGMENT_NAME, _segmentName); properties.setProperty(TABLE_NAME, _config.getTableName()); properties.setProperty(DIMENSIONS, _config.getDimensions()); @@ -530,7 +693,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { // Write time related metadata (start time, end time, time unit) if (timeColumnName != null) { - ColumnIndexCreationInfo timeColumnIndexCreationInfo = _indexCreationInfoMap.get(timeColumnName); + ColumnIndexCreationInfo timeColumnIndexCreationInfo = _indexCreationInfoMap + .get(timeColumnName); if (timeColumnIndexCreationInfo != null) { long startTime; long endTime; @@ -548,7 +712,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { if (_config.getTimeColumnType() == SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) { // For TimeColumnType.SIMPLE_DATE_FORMAT, convert time value into millis since epoch - DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern(_config.getSimpleDateFormat()); + DateTimeFormatter dateTimeFormatter = DateTimeFormat + .forPattern(_config.getSimpleDateFormat()); startTime = dateTimeFormatter.parseMillis(startTimeStr); endTime = dateTimeFormatter.parseMillis(endTimeStr); timeUnit = TimeUnit.MILLISECONDS; @@ -575,10 +740,12 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { if (!_config.isSkipTimeValueCheck()) { Interval timeInterval = - new Interval(timeUnit.toMillis(startTime), timeUnit.toMillis(endTime), DateTimeZone.UTC); + new Interval(timeUnit.toMillis(startTime), timeUnit.toMillis(endTime), + DateTimeZone.UTC); Preconditions.checkState(TimeUtils.isValidTimeInterval(timeInterval), "Invalid segment start/end time: %s (in millis: %s/%s) for time column: %s, must be between: %s", - timeInterval, timeInterval.getStartMillis(), timeInterval.getEndMillis(), timeColumnName, + timeInterval, timeInterval.getStartMillis(), timeInterval.getEndMillis(), + timeColumnName, TimeUtils.VALID_TIME_INTERVAL); } @@ -596,7 +763,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { String column = entry.getKey(); ColumnIndexCreationInfo columnIndexCreationInfo = entry.getValue(); SegmentDictionaryCreator dictionaryCreator = _dictionaryCreatorMap.get(column); - int dictionaryElementSize = (dictionaryCreator != null) ? dictionaryCreator.getNumBytesPerEntry() : 0; + int dictionaryElementSize = + (dictionaryCreator != null) ? dictionaryCreator.getNumBytesPerEntry() : 0; // TODO: after fixing the server-side dependency on HAS_INVERTED_INDEX and deployed, set HAS_INVERTED_INDEX // properly @@ -620,17 +788,24 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { boolean hasJsonIndex = _jsonIndexCreatorMap.containsKey(column); - addColumnMetadataInfo(properties, column, columnIndexCreationInfo, _totalDocs, _schema.getFieldSpecFor(column), - _dictionaryCreatorMap.containsKey(column), dictionaryElementSize, hasInvertedIndex, textIndexType, + addColumnMetadataInfo(properties, column, columnIndexCreationInfo, _totalDocs, + _schema.getFieldSpecFor(column), + _dictionaryCreatorMap.containsKey(column), dictionaryElementSize, hasInvertedIndex, + textIndexType, hasFSTIndex, hasJsonIndex); } properties.save(); +// StringWriter sw = new StringWriter(); +// properties.save(sw); +// System.out.println(sw.toString()); } public static void addColumnMetadataInfo(PropertiesConfiguration properties, String column, - ColumnIndexCreationInfo columnIndexCreationInfo, int totalDocs, FieldSpec fieldSpec, boolean hasDictionary, - int dictionaryElementSize, boolean hasInvertedIndex, TextIndexType textIndexType, boolean hasFSTIndex, + ColumnIndexCreationInfo columnIndexCreationInfo, int totalDocs, FieldSpec fieldSpec, + boolean hasDictionary, + int dictionaryElementSize, boolean hasInvertedIndex, TextIndexType textIndexType, + boolean hasFSTIndex, boolean hasJsonIndex) { int cardinality = columnIndexCreationInfo.getDistinctValueCount(); properties.setProperty(getKeyFor(column, CARDINALITY), String.valueOf(cardinality)); @@ -639,35 +814,44 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { properties.setProperty(getKeyFor(column, DATA_TYPE), String.valueOf(dataType)); properties.setProperty(getKeyFor(column, BITS_PER_ELEMENT), String.valueOf(PinotDataBitSet.getNumBitsPerValue(cardinality - 1))); - properties.setProperty(getKeyFor(column, DICTIONARY_ELEMENT_SIZE), String.valueOf(dictionaryElementSize)); - properties.setProperty(getKeyFor(column, COLUMN_TYPE), String.valueOf(fieldSpec.getFieldType())); - properties.setProperty(getKeyFor(column, IS_SORTED), String.valueOf(columnIndexCreationInfo.isSorted())); - properties.setProperty(getKeyFor(column, HAS_NULL_VALUE), String.valueOf(columnIndexCreationInfo.hasNulls())); + properties.setProperty(getKeyFor(column, DICTIONARY_ELEMENT_SIZE), + String.valueOf(dictionaryElementSize)); + properties + .setProperty(getKeyFor(column, COLUMN_TYPE), String.valueOf(fieldSpec.getFieldType())); + properties.setProperty(getKeyFor(column, IS_SORTED), + String.valueOf(columnIndexCreationInfo.isSorted())); + properties.setProperty(getKeyFor(column, HAS_NULL_VALUE), + String.valueOf(columnIndexCreationInfo.hasNulls())); properties.setProperty(getKeyFor(column, HAS_DICTIONARY), String.valueOf(hasDictionary)); properties.setProperty(getKeyFor(column, TEXT_INDEX_TYPE), textIndexType.name()); properties.setProperty(getKeyFor(column, HAS_INVERTED_INDEX), String.valueOf(hasInvertedIndex)); properties.setProperty(getKeyFor(column, HAS_FST_INDEX), String.valueOf(hasFSTIndex)); properties.setProperty(getKeyFor(column, HAS_JSON_INDEX), String.valueOf(hasJsonIndex)); - properties.setProperty(getKeyFor(column, IS_SINGLE_VALUED), String.valueOf(fieldSpec.isSingleValueField())); + properties.setProperty(getKeyFor(column, IS_SINGLE_VALUED), + String.valueOf(fieldSpec.isSingleValueField())); properties.setProperty(getKeyFor(column, MAX_MULTI_VALUE_ELEMENTS), String.valueOf(columnIndexCreationInfo.getMaxNumberOfMultiValueElements())); properties.setProperty(getKeyFor(column, TOTAL_NUMBER_OF_ENTRIES), String.valueOf(columnIndexCreationInfo.getTotalNumberOfEntries())); properties - .setProperty(getKeyFor(column, IS_AUTO_GENERATED), String.valueOf(columnIndexCreationInfo.isAutoGenerated())); + .setProperty(getKeyFor(column, IS_AUTO_GENERATED), + String.valueOf(columnIndexCreationInfo.isAutoGenerated())); PartitionFunction partitionFunction = columnIndexCreationInfo.getPartitionFunction(); if (partitionFunction != null) { properties.setProperty(getKeyFor(column, PARTITION_FUNCTION), partitionFunction.toString()); - properties.setProperty(getKeyFor(column, NUM_PARTITIONS), columnIndexCreationInfo.getNumPartitions()); - properties.setProperty(getKeyFor(column, PARTITION_VALUES), columnIndexCreationInfo.getPartitions()); + properties.setProperty(getKeyFor(column, NUM_PARTITIONS), + columnIndexCreationInfo.getNumPartitions()); + properties.setProperty(getKeyFor(column, PARTITION_VALUES), + columnIndexCreationInfo.getPartitions()); } // datetime field if (fieldSpec.getFieldType().equals(FieldType.DATE_TIME)) { DateTimeFieldSpec dateTimeFieldSpec = (DateTimeFieldSpec) fieldSpec; properties.setProperty(getKeyFor(column, DATETIME_FORMAT), dateTimeFieldSpec.getFormat()); - properties.setProperty(getKeyFor(column, DATETIME_GRANULARITY), dateTimeFieldSpec.getGranularity()); + properties + .setProperty(getKeyFor(column, DATETIME_GRANULARITY), dateTimeFieldSpec.getGranularity()); } // NOTE: Min/max could be null for real-time aggregate metrics. @@ -685,7 +869,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { } } - public static void addColumnMinMaxValueInfo(PropertiesConfiguration properties, String column, String minValue, + public static void addColumnMinMaxValueInfo(PropertiesConfiguration properties, String column, + String minValue, String maxValue) { if (isValidPropertyValue(minValue)) { properties.setProperty(getKeyFor(column, MIN_VALUE), minValue); @@ -699,9 +884,9 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { * Helper method to check whether the given value is a valid property value. * <p>Value is invalid iff: * <ul> - * <li>It contains more than 512 characters</li> - * <li>It contains leading/trailing whitespace</li> - * <li>It contains list separator (',')</li> + * <li>It contains more than 512 characters</li> + * <li>It contains leading/trailing whitespace</li> + * <li>It contains list separator (',')</li> * </ul> */ @VisibleForTesting @@ -713,7 +898,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { if (length > METADATA_PROPERTY_LENGTH_LIMIT) { return false; } - if (Character.isWhitespace(value.charAt(0)) || Character.isWhitespace(value.charAt(length - 1))) { + if (Character.isWhitespace(value.charAt(0)) || Character + .isWhitespace(value.charAt(length - 1))) { return false; } return value.indexOf(',') == -1; @@ -731,13 +917,15 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { * @param column Column name * @param totalDocs Total number of documents to index * @param lengthOfLongestEntry Length of longest entry - * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive the number of rows per chunk + * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive the number of rows + * per chunk * @param writerVersion version to use for the raw index writer * @return raw index creator - * @throws IOException */ - public static ForwardIndexCreator getRawIndexCreatorForColumn(File file, ChunkCompressionType compressionType, - String column, DataType dataType, int totalDocs, int lengthOfLongestEntry, boolean deriveNumDocsPerChunk, + public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File file, + ChunkCompressionType compressionType, + String column, DataType dataType, int totalDocs, int lengthOfLongestEntry, + boolean deriveNumDocsPerChunk, int writerVersion) throws IOException { switch (dataType.getStoredType()) { @@ -745,14 +933,56 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { case LONG: case FLOAT: case DOUBLE: - return new SingleValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, dataType, + return new SingleValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, + dataType, writerVersion); case STRING: case BYTES: - return new SingleValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, dataType, + return new SingleValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, + dataType, lengthOfLongestEntry, deriveNumDocsPerChunk, writerVersion); default: - throw new UnsupportedOperationException("Data type not supported for raw indexing: " + dataType); + throw new UnsupportedOperationException( + "Data type not supported for raw indexing: " + dataType); + } + } + + /** + * Helper method to build the raw index creator for the column. + * Assumes that column to be indexed is single valued. + * + * @param file Output index file + * @param column Column name + * @param totalDocs Total number of documents to index + * @param lengthOfLongestEntry Length of longest entry + * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive the number of rows + * per chunk + * @param writerVersion version to use for the raw index writer + * @return raw index creator + */ + public static ForwardIndexCreator getRawIndexCreatorForMVColumn(File file, + ChunkCompressionType compressionType, + String column, DataType dataType, final int totalDocs, + int lengthOfLongestEntry, + final int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk, + int writerVersion) + throws IOException { + switch (dataType.getStoredType()) { + case INT: + case LONG: + case FLOAT: + case DOUBLE: + return new MultiValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, + dataType, dataType.getStoredType().size(), maxNumberOfMultiValueElements, + deriveNumDocsPerChunk, writerVersion); + case STRING: + case BYTES: + return new MultiValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, + dataType, lengthOfLongestEntry, maxNumberOfMultiValueElements, deriveNumDocsPerChunk, + writerVersion); + default: + throw new UnsupportedOperationException( + "Data type not supported for raw indexing: " + dataType); } } @@ -760,8 +990,10 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { public void close() throws IOException { FileUtils.close(Iterables - .concat(_dictionaryCreatorMap.values(), _forwardIndexCreatorMap.values(), _invertedIndexCreatorMap.values(), - _textIndexCreatorMap.values(), _fstIndexCreatorMap.values(), _jsonIndexCreatorMap.values(), + .concat(_dictionaryCreatorMap.values(), _forwardIndexCreatorMap.values(), + _invertedIndexCreatorMap.values(), + _textIndexCreatorMap.values(), _fstIndexCreatorMap.values(), + _jsonIndexCreatorMap.values(), _h3IndexCreatorMap.values(), _nullValueVectorCreatorMap.values())); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java index d608a65..de11500 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java @@ -26,6 +26,7 @@ import org.apache.commons.io.FileUtils; import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter; import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.V1Constants.Indexes; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; import org.apache.pinot.spi.data.FieldSpec.DataType; @@ -51,13 +52,14 @@ public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator { * @param column Name of column to index * @param totalDocs Total number of documents to index * @param valueType Type of the values - * @param maxLength length of longest entry (in bytes) */ public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, String column, - int totalDocs, DataType valueType, int maxLength) + int totalDocs, DataType valueType, final int maxLengthOfEachEntry, + final int maxNumberOfMultiValueElements) throws IOException { - this(baseIndexDir, compressionType, column, totalDocs, valueType, maxLength, false, + this(baseIndexDir, compressionType, column, totalDocs, valueType, maxLengthOfEachEntry, + maxNumberOfMultiValueElements, false, BaseChunkSVForwardIndexWriter.DEFAULT_VERSION); } @@ -69,23 +71,23 @@ public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator { * @param column Name of column to index * @param totalDocs Total number of documents to index * @param valueType Type of the values - * @param maxLength length of longest entry (in bytes) + * @param maxLengthOfEachEntry length of longest entry (in bytes) * @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per chunk * @param writerVersion writer format version */ public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, - String column, - int totalDocs, DataType valueType, int maxLength, boolean deriveNumDocsPerChunk, + String column, int totalDocs, DataType valueType, final int maxLengthOfEachEntry, + final int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk, int writerVersion) throws IOException { File file = new File(baseIndexDir, - column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION); + column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION); FileUtils.deleteQuietly(file); + int totalMaxLength = maxNumberOfMultiValueElements * maxLengthOfEachEntry; int numDocsPerChunk = - deriveNumDocsPerChunk ? getNumDocsPerChunk(maxLength) : DEFAULT_NUM_DOCS_PER_CHUNK; + deriveNumDocsPerChunk ? getNumDocsPerChunk(totalMaxLength) : DEFAULT_NUM_DOCS_PER_CHUNK; _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs, - numDocsPerChunk, maxLength, - writerVersion); + numDocsPerChunk, totalMaxLength, writerVersion); _valueType = valueType; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java index 465b5f7..9264bde 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java @@ -65,9 +65,8 @@ public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator { String column, int totalDocs, DataType valueType, int maxTotalContentLength, int maxElements) throws IOException { - this(baseIndexDir, compressionType, column, totalDocs, valueType, false, maxTotalContentLength, - maxElements, - BaseChunkSVForwardIndexWriter.DEFAULT_VERSION); + this(baseIndexDir, compressionType, column, totalDocs, valueType, maxTotalContentLength, + maxElements, false, BaseChunkSVForwardIndexWriter.DEFAULT_VERSION); } /** @@ -78,25 +77,24 @@ public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator { * @param column Name of column to index * @param totalDocs Total number of documents to index * @param valueType Type of the values - * @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per chunk - * @param maxTotalContentLength max total content length + * @param maxLength max length for each entry * @param maxElements max number of elements + * @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per + * chunk * @param writerVersion writer format version */ public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, - String column, - int totalDocs, DataType valueType, boolean deriveNumDocsPerChunk, int maxTotalContentLength, - int maxElements, - int writerVersion) + String column, int totalDocs, DataType valueType, + int maxLength, int maxElements, boolean deriveNumDocsPerChunk, int writerVersion) throws IOException { //we will prepend the actual content with numElements and length array containing length of each element - int maxLength = Integer.BYTES + maxElements * Integer.BYTES + maxTotalContentLength; + int totalMaxLength = Integer.BYTES + maxElements * Integer.BYTES + maxLength * maxElements; File file = new File(baseIndexDir, column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION); int numDocsPerChunk = - deriveNumDocsPerChunk ? getNumDocsPerChunk(maxLength) : DEFAULT_NUM_DOCS_PER_CHUNK; + deriveNumDocsPerChunk ? getNumDocsPerChunk(totalMaxLength) : DEFAULT_NUM_DOCS_PER_CHUNK; _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs, - numDocsPerChunk, maxLength, + numDocsPerChunk, totalMaxLength, writerVersion); _valueType = valueType; } @@ -151,6 +149,7 @@ public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator { @Override public void putBytesMV(final byte[][] values) { + int totalBytes = 0; for (int i = 0; i < values.length; i++) { int length = values[i].length; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java index a0cfd66..908be20 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -21,14 +21,15 @@ package org.apache.pinot.segment.local.segment.creator.impl.stats; import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet; import java.util.Arrays; import java.util.Set; +import org.apache.pinot.common.utils.StringUtil; import org.apache.pinot.segment.spi.creator.StatsCollectorConfig; import org.apache.pinot.spi.utils.ByteArray; - /** * Extension of {@link AbstractColumnStatisticsCollector} for byte[] column type. */ public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatisticsCollector { + private final Set<ByteArray> _values = new ObjectOpenHashSet<>(INITIAL_HASH_SET_SIZE); private int _minLength = Integer.MAX_VALUE; @@ -36,22 +37,36 @@ public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatistics private ByteArray[] _sortedValues; private boolean _sealed = false; - public BytesColumnPredIndexStatsCollector(String column, StatsCollectorConfig statsCollectorConfig) { + public BytesColumnPredIndexStatsCollector(String column, + StatsCollectorConfig statsCollectorConfig) { super(column, statsCollectorConfig); } @Override public void collect(Object entry) { - ByteArray value = new ByteArray((byte[]) entry); - addressSorted(value); - updatePartition(value); - _values.add(value); - - int length = value.length(); - _minLength = Math.min(_minLength, length); - _maxLength = Math.max(_maxLength, length); - - _totalNumberOfEntries++; + if (entry instanceof Object[]) { + Object[] values = (Object[]) entry; + for (Object obj : values) { + ByteArray value = new ByteArray((byte[]) obj); + _values.add(value); + int length = value.length(); + _minLength = Math.min(_minLength, length); + _maxLength = Math.max(_maxLength, length); + } + _maxNumberOfMultiValues = Math.max(_maxNumberOfMultiValues, values.length); + updateTotalNumberOfEntries(values); + } else { + ByteArray value = new ByteArray((byte[]) entry); + addressSorted(value); + updatePartition(value); + _values.add(value); + + int length = value.length(); + _minLength = Math.min(_minLength, length); + _maxLength = Math.max(_maxLength, length); + + _totalNumberOfEntries++; + } } @Override @@ -59,7 +74,8 @@ public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatistics if (_sealed) { return _sortedValues[0]; } - throw new IllegalStateException("you must seal the collector first before asking for min value"); + throw new IllegalStateException( + "you must seal the collector first before asking for min value"); } @Override @@ -67,7 +83,8 @@ public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatistics if (_sealed) { return _sortedValues[_sortedValues.length - 1]; } - throw new IllegalStateException("you must seal the collector first before asking for max value"); + throw new IllegalStateException( + "you must seal the collector first before asking for max value"); } @Override @@ -75,7 +92,8 @@ public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatistics if (_sealed) { return _sortedValues; } - throw new IllegalStateException("you must seal the collector first before asking for unique values set"); + throw new IllegalStateException( + "you must seal the collector first before asking for unique values set"); } @Override @@ -88,7 +106,8 @@ public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatistics if (_sealed) { return _maxLength; } - throw new IllegalStateException("you must seal the collector first before asking for longest value"); + throw new IllegalStateException( + "you must seal the collector first before asking for longest value"); } @Override @@ -96,7 +115,8 @@ public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatistics if (_sealed) { return _sortedValues.length; } - throw new IllegalStateException("you must seal the collector first before asking for cardinality"); + throw new IllegalStateException( + "you must seal the collector first before asking for cardinality"); } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java index aba55ca..71e62cb 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java @@ -30,6 +30,7 @@ import java.util.Set; import org.apache.commons.io.FileUtils; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.V1Constants.Indexes; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.segment.spi.store.ColumnIndexDirectory; @@ -167,7 +168,11 @@ class FilePerIndexDirectory extends ColumnIndexDirectory { fileExtension = V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION; } } else { - fileExtension = V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION; + if (!columnMetadata.hasDictionary()) { + fileExtension = V1Constants.Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION; + } else { + fileExtension = V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION; + } } break; case INVERTED_INDEX: diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java index 373c3a9..e98a185 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java @@ -39,7 +39,7 @@ public class MultiValueVarByteRawIndexCreatorTest { } @Test - public void testMV() throws IOException { + public void testMVString() throws IOException { String column = "testCol"; int numDocs = 1000; int maxElements = 50; @@ -78,4 +78,47 @@ public class MultiValueVarByteRawIndexCreatorTest { Assert.assertEquals(inputs.get(i), readValue); } } + + @Test + public void testMVBytes() throws IOException { + String column = "testCol"; + int numDocs = 1000; + int maxElements = 50; + int maxTotalLength = 500; + File file = new File(OUTPUT_DIR, column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION); + file.delete(); + MultiValueVarByteRawIndexCreator creator = new MultiValueVarByteRawIndexCreator( + new File(OUTPUT_DIR), ChunkCompressionType.SNAPPY, column, numDocs, DataType.BYTES, + maxTotalLength, maxElements); + List<byte[][]> inputs = new ArrayList<>(); + Random random = new Random(); + for (int i = 0; i < numDocs; i++) { + //int length = 1; + int length = random.nextInt(10); + byte[][] values = new byte[length][]; + for (int j = 0; j < length; j++) { + char[] value = new char[length]; + Arrays.fill(value, 'a'); + values[j] = new String(value).getBytes(); + } + inputs.add(values); + creator.putBytesMV(values); + } + creator.close(); + + //read + final PinotDataBuffer buffer = PinotDataBuffer + .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, ""); + VarByteChunkMVForwardIndexReader reader = new VarByteChunkMVForwardIndexReader(buffer, + DataType.BYTES); + final ChunkReaderContext context = reader.createContext(); + byte[][] values = new byte[maxElements][]; + for (int i = 0; i < numDocs; i++) { + int length = reader.getBytesMV(i, values, context); + byte[][] readValue = Arrays.copyOf(values, length); + for (int j = 0; j < length; j++) { + Assert.assertTrue(Arrays.equals(inputs.get(i)[j], readValue[j])); + } + } + } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java index 9f515e8..b9f0f15 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.segment.index.creator; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -32,6 +33,7 @@ import org.apache.pinot.segment.local.loader.LocalSegmentDirectoryLoader; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader; +import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader; import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; @@ -55,15 +57,16 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; - /** * Class for testing Raw index creators. */ public class RawIndexCreatorTest { + private static final int NUM_ROWS = 10009; private static final int MAX_STRING_LENGTH = 101; - private static final String SEGMENT_DIR_NAME = System.getProperty("java.io.tmpdir") + File.separator + "fwdIndexTest"; + private static final String SEGMENT_DIR_NAME = + System.getProperty("java.io.tmpdir") + File.separator + "fwdIndexTest"; private static final String SEGMENT_NAME = "testSegment"; private static final String INT_COLUMN = "intColumn"; @@ -71,6 +74,12 @@ public class RawIndexCreatorTest { private static final String FLOAT_COLUMN = "floatColumn"; private static final String DOUBLE_COLUMN = "doubleColumn"; private static final String STRING_COLUMN = "stringColumn"; + private static final String INT_MV_COLUMN = "intMVColumn"; + private static final String LONG_MV_COLUMN = "longMVColumn"; + private static final String FLOAT_MV_COLUMN = "floatMVColumn"; + private static final String DOUBLE_MV_COLUMN = "doubleMVColumn"; + private static final String STRING_MV_COLUMN = "stringMVColumn"; + private static final String BYTES_MV_COLUMN = "bytesMVColumn"; Random _random; private RecordReader _recordReader; @@ -79,8 +88,6 @@ public class RawIndexCreatorTest { /** * Setup to build a segment with raw indexes (no-dictionary) of various data types. - * - * @throws Exception */ @BeforeClass public void setup() @@ -91,8 +98,15 @@ public class RawIndexCreatorTest { schema.addField(new DimensionFieldSpec(FLOAT_COLUMN, DataType.FLOAT, true)); schema.addField(new DimensionFieldSpec(DOUBLE_COLUMN, DataType.DOUBLE, true)); schema.addField(new DimensionFieldSpec(STRING_COLUMN, DataType.STRING, true)); + schema.addField(new DimensionFieldSpec(INT_MV_COLUMN, DataType.INT, false)); + schema.addField(new DimensionFieldSpec(LONG_MV_COLUMN, DataType.LONG, false)); + schema.addField(new DimensionFieldSpec(FLOAT_MV_COLUMN, DataType.FLOAT, false)); + schema.addField(new DimensionFieldSpec(DOUBLE_MV_COLUMN, DataType.DOUBLE, false)); + schema.addField(new DimensionFieldSpec(STRING_MV_COLUMN, DataType.STRING, false)); + schema.addField(new DimensionFieldSpec(BYTES_MV_COLUMN, DataType.BYTES, false)); - TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("test").build(); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("test") + .build(); _random = new Random(System.nanoTime()); _recordReader = buildIndex(tableConfig, schema); @@ -109,7 +123,6 @@ public class RawIndexCreatorTest { /** * Test for int raw index creator. * Compares values read from the raw index against expected value. - * @throws Exception */ @Test public void testIntRawIndexCreator() @@ -120,7 +133,6 @@ public class RawIndexCreatorTest { /** * Test for long raw index creator. * Compares values read from the raw index against expected value. - * @throws Exception */ @Test public void testLongRawIndexCreator() @@ -131,7 +143,6 @@ public class RawIndexCreatorTest { /** * Test for float raw index creator. * Compares values read from the raw index against expected value. - * @throws Exception */ @Test public void testFloatRawIndexCreator() @@ -142,7 +153,6 @@ public class RawIndexCreatorTest { /** * Test for double raw index creator. * Compares values read from the raw index against expected value. - * @throws Exception */ @Test public void testDoubleRawIndexCreator() @@ -153,19 +163,21 @@ public class RawIndexCreatorTest { /** * Test for string raw index creator. * Compares values read from the raw index against expected value. - * @throws Exception */ @Test public void testStringRawIndexCreator() throws Exception { PinotDataBuffer indexBuffer = getIndexBufferForColumn(STRING_COLUMN); - try (VarByteChunkSVForwardIndexReader rawIndexReader = new VarByteChunkSVForwardIndexReader(indexBuffer, + try (VarByteChunkSVForwardIndexReader rawIndexReader = new VarByteChunkSVForwardIndexReader( + indexBuffer, DataType.STRING); - BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader.createContext()) { + BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader + .createContext()) { _recordReader.rewind(); for (int row = 0; row < NUM_ROWS; row++) { GenericRow expectedRow = _recordReader.next(); - Assert.assertEquals(rawIndexReader.getString(row, readerContext), expectedRow.getValue(STRING_COLUMN)); + Assert.assertEquals(rawIndexReader.getString(row, readerContext), + expectedRow.getValue(STRING_COLUMN)); } } } @@ -175,17 +187,88 @@ public class RawIndexCreatorTest { * * @param column Column for which to perform the test * @param dataType Data type of the column - * @throws Exception */ private void testFixedLengthRawIndexCreator(String column, DataType dataType) throws Exception { PinotDataBuffer indexBuffer = getIndexBufferForColumn(column); - try (FixedByteChunkSVForwardIndexReader rawIndexReader = new FixedByteChunkSVForwardIndexReader(indexBuffer, - dataType); BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader.createContext()) { + try (FixedByteChunkSVForwardIndexReader rawIndexReader = new FixedByteChunkSVForwardIndexReader( + indexBuffer, + dataType); BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader + .createContext()) { + _recordReader.rewind(); + for (int row = 0; row < NUM_ROWS; row++) { + GenericRow expectedRow = _recordReader.next(); + Assert.assertEquals(readValueFromIndex(rawIndexReader, readerContext, row), + expectedRow.getValue(column)); + } + } + } + + /** + * Test for multi value string raw index creator. + * Compares values read from the raw index against expected value. + */ + @Test + public void testStringMVRawIndexCreator() + throws Exception { + PinotDataBuffer indexBuffer = getIndexBufferForColumn(STRING_MV_COLUMN); + try (VarByteChunkMVForwardIndexReader rawIndexReader = new VarByteChunkMVForwardIndexReader( + indexBuffer, + DataType.STRING); + BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader + .createContext()) { _recordReader.rewind(); + int maxNumberOfMultiValues = _segmentDirectory.getSegmentMetadata() + .getColumnMetadataFor(STRING_MV_COLUMN).getMaxNumberOfMultiValues(); + final String[] valueBuffer = new String[maxNumberOfMultiValues]; for (int row = 0; row < NUM_ROWS; row++) { GenericRow expectedRow = _recordReader.next(); - Assert.assertEquals(readValueFromIndex(rawIndexReader, readerContext, row), expectedRow.getValue(column)); + + int length = rawIndexReader.getStringMV(row, valueBuffer, readerContext); + String[] readValue = Arrays.copyOf(valueBuffer, length); + Object[] writtenValue = (Object[]) expectedRow.getValue(STRING_MV_COLUMN); + if (writtenValue == null || writtenValue.length == 0) { + writtenValue = new String[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING}; + } + for (int i = 0; i < length; i++) { + Object expected = writtenValue[i]; + Object actual = readValue[i]; + Assert.assertEquals(expected, actual); + } + } + } + } + + /** + * Test for multi value string raw index creator. + * Compares values read from the raw index against expected value. + */ + @Test + public void testBytesMVRawIndexCreator() + throws Exception { + PinotDataBuffer indexBuffer = getIndexBufferForColumn(BYTES_MV_COLUMN); + try (VarByteChunkMVForwardIndexReader rawIndexReader = new VarByteChunkMVForwardIndexReader( + indexBuffer, DataType.BYTES); + BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader + .createContext()) { + _recordReader.rewind(); + int maxNumberOfMultiValues = _segmentDirectory.getSegmentMetadata() + .getColumnMetadataFor(BYTES_MV_COLUMN).getMaxNumberOfMultiValues(); + final byte[][] valueBuffer = new byte[maxNumberOfMultiValues][]; + for (int row = 0; row < NUM_ROWS; row++) { + GenericRow expectedRow = _recordReader.next(); + + int length = rawIndexReader.getBytesMV(row, valueBuffer, readerContext); + byte[][] readValue = Arrays.copyOf(valueBuffer, length); + Object[] writtenValue = (Object[]) expectedRow.getValue(BYTES_MV_COLUMN); + if (writtenValue == null || writtenValue.length == 0) { + writtenValue = new byte[][]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_BYTES}; + } + for (int i = 0; i < length; i++) { + Object expected = writtenValue[i]; + Object actual = readValue[i]; + Assert.assertTrue(Arrays.equals((byte[]) expected, (byte[]) actual)); + } } } } @@ -202,10 +285,10 @@ public class RawIndexCreatorTest { } /** - * Helper method to build a segment containing a single valued string column with RAW (no-dictionary) index. + * Helper method to build a segment containing a single valued string column with RAW + * (no-dictionary) index. * * @return Array of string values for the rows in the generated index. - * @throws Exception */ private RecordReader buildIndex(TableConfig tableConfig, Schema schema) throws Exception { @@ -221,9 +304,17 @@ public class RawIndexCreatorTest { for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) { Object value; - - value = getRandomValue(_random, fieldSpec.getDataType()); - map.put(fieldSpec.getName(), value); + if (fieldSpec.isSingleValueField()) { + value = getRandomValue(_random, fieldSpec.getDataType()); + map.put(fieldSpec.getName(), value); + } else { + int length = _random.nextInt(50); + Object[] values = new Object[length]; + for (int j = 0; j < length; j++) { + values[j] = getRandomValue(_random, fieldSpec.getDataType()); + } + map.put(fieldSpec.getName(), values); + } } GenericRow genericRow = new GenericRow(); @@ -262,9 +353,15 @@ public class RawIndexCreatorTest { return random.nextDouble(); case STRING: return StringUtil - .sanitizeStringValue(RandomStringUtils.random(random.nextInt(MAX_STRING_LENGTH)), Integer.MAX_VALUE); + .sanitizeStringValue(RandomStringUtils.random(random.nextInt(MAX_STRING_LENGTH)), + Integer.MAX_VALUE); + case BYTES: + return StringUtil + .sanitizeStringValue(RandomStringUtils.random(random.nextInt(MAX_STRING_LENGTH)), + Integer.MAX_VALUE).getBytes(); default: - throw new UnsupportedOperationException("Unsupported data type for random value generator: " + dataType); + throw new UnsupportedOperationException( + "Unsupported data type for random value generator: " + dataType); } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java index d3e53141..279ae6d 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java @@ -308,7 +308,7 @@ public class DictionaryToRawIndexConverter { int lengthOfLongestEntry = (storedType == DataType.STRING) ? getLengthOfLongestEntry(dictionary) : -1; try (ForwardIndexCreator rawIndexCreator = SegmentColumnarIndexCreator - .getRawIndexCreatorForColumn(newSegment, compressionType, column, storedType, numDocs, lengthOfLongestEntry, + .getRawIndexCreatorForSVColumn(newSegment, compressionType, column, storedType, numDocs, lengthOfLongestEntry, false, BaseChunkSVForwardIndexWriter.DEFAULT_VERSION); ForwardIndexReaderContext readerContext = reader.createContext()) { switch (storedType) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org