somandal commented on code in PR #9510: URL: https://github.com/apache/pinot/pull/9510#discussion_r994866935
########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java: ########## @@ -106,7 +105,8 @@ Map<String, Operation> computeOperation(SegmentDirectory.Reader segmentReader) throws Exception { Map<String, Operation> columnOperationMap = new HashMap<>(); - // Does not work for segment versions < V3 + // Does not work for segment versions < V3. + // TODO: Remove this limitation. Review Comment: nit: move this TODO to the javadocs part? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java: ########## @@ -173,7 +168,143 @@ private void rewriteRawForwardIndex(String column, SegmentDirectory.Writer segme IndexCreatorProvider indexCreatorProvider) throws Exception { Preconditions.checkState(_segmentMetadata.getVersion() == SegmentVersion.v3); + ColumnMetadata existingColMetadata = _segmentMetadata.getColumnMetadataFor(column); + boolean isSingleValue = existingColMetadata.isSingleValue(); + + if (isSingleValue) { + rewriteRawSVForwardIndex(column, segmentWriter, indexCreatorProvider); + } else { + rewriteRawMVForwardIndex(column, segmentWriter, indexCreatorProvider); + } + } + + private void rewriteRawMVForwardIndex(String column, SegmentDirectory.Writer segmentWriter, + IndexCreatorProvider indexCreatorProvider) + throws Exception { + ColumnMetadata existingColMetadata = _segmentMetadata.getColumnMetadataFor(column); + File indexDir = _segmentMetadata.getIndexDir(); + String segmentName = _segmentMetadata.getName(); + File inProgress = new File(indexDir, column + ".fwd.inprogress"); + File fwdIndexFile = new File(indexDir, column + V1Constants.Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION); + + if (!inProgress.exists()) { + // Marker file does not exist, which means last run ended normally. + // Create a marker file. + FileUtils.touch(inProgress); + } else { + // Marker file exists, which means last run was interrupted. + // Remove forward index if exists. + FileUtils.deleteQuietly(fwdIndexFile); + } + + LOGGER.info("Creating new forward index for segment={} and column={}", segmentName, column); + Map<String, ChunkCompressionType> compressionConfigs = _indexLoadingConfig.getCompressionConfigs(); + Preconditions.checkState(compressionConfigs.containsKey(column)); + // At this point, compressionConfigs is guaranteed to contain the column. + ChunkCompressionType newCompressionType = compressionConfigs.get(column); + + int numDocs = existingColMetadata.getTotalDocs(); + + try (ForwardIndexReader reader = LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) { + int lengthOfLongestEntry = reader.getLengthOfLongestEntry(); + Preconditions.checkState(lengthOfLongestEntry >= 0, + "lengthOfLongestEntry cannot be negative. segment=" + segmentName + " column={}" + column); + int maxNumberOfMVEntries = existingColMetadata.getMaxNumberOfMultiValues(); + int maxRowLengthInBytes = lengthOfLongestEntry - (Integer.BYTES * maxNumberOfMVEntries) - Integer.BYTES; Review Comment: This code is not at all obvious. Can you explain this part here? e.g. why are you subtracting `(Integer.BYTES * maxNumberOfMVEntries)` for `MultiValueVarByteRawIndexCreator` I see why this will work and that this will actually get passed to the forward index creator and used ``` int maxLengthPrefixes = Integer.BYTES * maxNumberOfElements; int totalMaxLength = Integer.BYTES + maxRowLengthInBytes + maxLengthPrefixes; ``` For `MultiValueFixedByteRawIndexCreator` they calculate it as: ``` int totalMaxLength = Integer.BYTES + (maxNumberOfMultiValueElements * valueType.getStoredType().size()); ``` The `maxRowLengthInBytes` isn't actually passed to this forward index creator though and is thus never actually used. Can you perhaps add a comment about why it's okay to set this up in this way (that it's only used by the variable length types)? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java: ########## @@ -173,7 +168,143 @@ private void rewriteRawForwardIndex(String column, SegmentDirectory.Writer segme IndexCreatorProvider indexCreatorProvider) throws Exception { Preconditions.checkState(_segmentMetadata.getVersion() == SegmentVersion.v3); + ColumnMetadata existingColMetadata = _segmentMetadata.getColumnMetadataFor(column); + boolean isSingleValue = existingColMetadata.isSingleValue(); + + if (isSingleValue) { + rewriteRawSVForwardIndex(column, segmentWriter, indexCreatorProvider); + } else { + rewriteRawMVForwardIndex(column, segmentWriter, indexCreatorProvider); + } + } + + private void rewriteRawMVForwardIndex(String column, SegmentDirectory.Writer segmentWriter, + IndexCreatorProvider indexCreatorProvider) + throws Exception { + ColumnMetadata existingColMetadata = _segmentMetadata.getColumnMetadataFor(column); + File indexDir = _segmentMetadata.getIndexDir(); + String segmentName = _segmentMetadata.getName(); + File inProgress = new File(indexDir, column + ".fwd.inprogress"); + File fwdIndexFile = new File(indexDir, column + V1Constants.Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION); + + if (!inProgress.exists()) { + // Marker file does not exist, which means last run ended normally. + // Create a marker file. + FileUtils.touch(inProgress); + } else { + // Marker file exists, which means last run was interrupted. + // Remove forward index if exists. + FileUtils.deleteQuietly(fwdIndexFile); + } + + LOGGER.info("Creating new forward index for segment={} and column={}", segmentName, column); + Map<String, ChunkCompressionType> compressionConfigs = _indexLoadingConfig.getCompressionConfigs(); + Preconditions.checkState(compressionConfigs.containsKey(column)); + // At this point, compressionConfigs is guaranteed to contain the column. + ChunkCompressionType newCompressionType = compressionConfigs.get(column); + + int numDocs = existingColMetadata.getTotalDocs(); + + try (ForwardIndexReader reader = LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) { + int lengthOfLongestEntry = reader.getLengthOfLongestEntry(); + Preconditions.checkState(lengthOfLongestEntry >= 0, + "lengthOfLongestEntry cannot be negative. segment=" + segmentName + " column={}" + column); Review Comment: nit: column={} -> column= ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java: ########## @@ -95,26 +99,92 @@ public Object getValue(int docId) { return values; } } else { - // NOTE: Only support single-value raw index - assert _forwardIndexReader.isSingleValue(); - - switch (_forwardIndexReader.getStoredType()) { - case INT: - return _forwardIndexReader.getInt(docId, _forwardIndexReaderContext); - case LONG: - return _forwardIndexReader.getLong(docId, _forwardIndexReaderContext); - case FLOAT: - return _forwardIndexReader.getFloat(docId, _forwardIndexReaderContext); - case DOUBLE: - return _forwardIndexReader.getDouble(docId, _forwardIndexReaderContext); - case BIG_DECIMAL: - return _forwardIndexReader.getBigDecimal(docId, _forwardIndexReaderContext); - case STRING: - return _forwardIndexReader.getString(docId, _forwardIndexReaderContext); - case BYTES: - return _forwardIndexReader.getBytes(docId, _forwardIndexReaderContext); - default: - throw new IllegalStateException(); + if (_forwardIndexReader.isSingleValue()) { + switch (_forwardIndexReader.getStoredType()) { + case INT: + return _forwardIndexReader.getInt(docId, _forwardIndexReaderContext); + case LONG: + return _forwardIndexReader.getLong(docId, _forwardIndexReaderContext); + case FLOAT: + return _forwardIndexReader.getFloat(docId, _forwardIndexReaderContext); + case DOUBLE: + return _forwardIndexReader.getDouble(docId, _forwardIndexReaderContext); + case BIG_DECIMAL: + return _forwardIndexReader.getBigDecimal(docId, _forwardIndexReaderContext); + case STRING: + return _forwardIndexReader.getString(docId, _forwardIndexReaderContext); + case BYTES: + return _forwardIndexReader.getBytes(docId, _forwardIndexReaderContext); + default: + throw new IllegalStateException(); + } + } else { + Preconditions.checkState(_maxNumValuesPerMVEntry >= 0, "maxNumValuesPerMVEntry is negative for an MV column."); + + switch (_forwardIndexReader.getStoredType()) { Review Comment: Thanks for adding this. I do see value in keeping this code even if you decide to either add more efficient functions to directly fetch the type array or if you decide to directly use the `ForwardIndexReader`. ########## pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java: ########## @@ -81,15 +81,22 @@ public class ForwardIndexHandlerTest { private static final String DIM_ZSTANDARD_INTEGER = "DIM_ZSTANDARD_INTEGER"; private static final String DIM_LZ4_INTEGER = "DIM_LZ4_INTEGER"; + // Dictionary columns private static final String DIM_DICT_INTEGER = "DIM_DICT_INTEGER"; private static final String DIM_DICT_STRING = "DIM_DICT_STRING"; private static final String DIM_DICT_LONG = "DIM_DICT_LONG"; - private static final String METRIC_PASSTHROUGH_INTEGER = "METRIC_PASSTHROUGH_INTEGER"; + // Metric columns + private static final String METRIC_PASS_THROUGH_INTEGER = "METRIC_PASS_THROUGH_INTEGER"; private static final String METRIC_SNAPPY_INTEGER = "METRIC_SNAPPY_INTEGER"; private static final String METRIC_ZSTANDARD_INTEGER = "METRIC_ZSTANDARD_INTEGER"; private static final String METRIC_LZ4_INTEGER = "METRIC_LZ4_INTEGER"; + // Multivalue columns + private static final String DIM_MV_PASS_THROUGH_INTEGER = "DIM_MV_PASS_THROUGH_INTEGER"; Review Comment: nit: good to add a column type and test for either `LONG` or `FLOAT` type columns too. This is because they have a different size than `INTEGER` and in the past testing these helped find me bugs. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java: ########## @@ -173,7 +168,143 @@ private void rewriteRawForwardIndex(String column, SegmentDirectory.Writer segme IndexCreatorProvider indexCreatorProvider) throws Exception { Preconditions.checkState(_segmentMetadata.getVersion() == SegmentVersion.v3); + ColumnMetadata existingColMetadata = _segmentMetadata.getColumnMetadataFor(column); + boolean isSingleValue = existingColMetadata.isSingleValue(); Review Comment: nit: I see you fetch the columnMetadata again inside the `rewriteRawMVForwardIndex`, perhaps just pass this into the functions instead? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java: ########## @@ -95,26 +99,92 @@ public Object getValue(int docId) { return values; } } else { - // NOTE: Only support single-value raw index - assert _forwardIndexReader.isSingleValue(); - - switch (_forwardIndexReader.getStoredType()) { - case INT: - return _forwardIndexReader.getInt(docId, _forwardIndexReaderContext); - case LONG: - return _forwardIndexReader.getLong(docId, _forwardIndexReaderContext); - case FLOAT: - return _forwardIndexReader.getFloat(docId, _forwardIndexReaderContext); - case DOUBLE: - return _forwardIndexReader.getDouble(docId, _forwardIndexReaderContext); - case BIG_DECIMAL: - return _forwardIndexReader.getBigDecimal(docId, _forwardIndexReaderContext); - case STRING: - return _forwardIndexReader.getString(docId, _forwardIndexReaderContext); - case BYTES: - return _forwardIndexReader.getBytes(docId, _forwardIndexReaderContext); - default: - throw new IllegalStateException(); + if (_forwardIndexReader.isSingleValue()) { + switch (_forwardIndexReader.getStoredType()) { + case INT: + return _forwardIndexReader.getInt(docId, _forwardIndexReaderContext); + case LONG: + return _forwardIndexReader.getLong(docId, _forwardIndexReaderContext); + case FLOAT: + return _forwardIndexReader.getFloat(docId, _forwardIndexReaderContext); + case DOUBLE: + return _forwardIndexReader.getDouble(docId, _forwardIndexReaderContext); + case BIG_DECIMAL: + return _forwardIndexReader.getBigDecimal(docId, _forwardIndexReaderContext); + case STRING: + return _forwardIndexReader.getString(docId, _forwardIndexReaderContext); + case BYTES: + return _forwardIndexReader.getBytes(docId, _forwardIndexReaderContext); + default: + throw new IllegalStateException(); + } + } else { + Preconditions.checkState(_maxNumValuesPerMVEntry >= 0, "maxNumValuesPerMVEntry is negative for an MV column."); Review Comment: can you add this check in the constructor instead where you fetch this for MV columns? This should not be negative since you're checking the same isSingleValue() flag in your if / else blocks. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java: ########## @@ -173,7 +168,143 @@ private void rewriteRawForwardIndex(String column, SegmentDirectory.Writer segme IndexCreatorProvider indexCreatorProvider) throws Exception { Preconditions.checkState(_segmentMetadata.getVersion() == SegmentVersion.v3); + ColumnMetadata existingColMetadata = _segmentMetadata.getColumnMetadataFor(column); + boolean isSingleValue = existingColMetadata.isSingleValue(); + + if (isSingleValue) { + rewriteRawSVForwardIndex(column, segmentWriter, indexCreatorProvider); + } else { + rewriteRawMVForwardIndex(column, segmentWriter, indexCreatorProvider); + } + } + + private void rewriteRawMVForwardIndex(String column, SegmentDirectory.Writer segmentWriter, + IndexCreatorProvider indexCreatorProvider) + throws Exception { + ColumnMetadata existingColMetadata = _segmentMetadata.getColumnMetadataFor(column); + File indexDir = _segmentMetadata.getIndexDir(); + String segmentName = _segmentMetadata.getName(); + File inProgress = new File(indexDir, column + ".fwd.inprogress"); + File fwdIndexFile = new File(indexDir, column + V1Constants.Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION); + + if (!inProgress.exists()) { + // Marker file does not exist, which means last run ended normally. + // Create a marker file. + FileUtils.touch(inProgress); + } else { + // Marker file exists, which means last run was interrupted. + // Remove forward index if exists. + FileUtils.deleteQuietly(fwdIndexFile); + } + + LOGGER.info("Creating new forward index for segment={} and column={}", segmentName, column); + Map<String, ChunkCompressionType> compressionConfigs = _indexLoadingConfig.getCompressionConfigs(); + Preconditions.checkState(compressionConfigs.containsKey(column)); + // At this point, compressionConfigs is guaranteed to contain the column. Review Comment: nit: move comment to be above the Preconditions check ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java: ########## @@ -173,7 +168,143 @@ private void rewriteRawForwardIndex(String column, SegmentDirectory.Writer segme IndexCreatorProvider indexCreatorProvider) throws Exception { Preconditions.checkState(_segmentMetadata.getVersion() == SegmentVersion.v3); + ColumnMetadata existingColMetadata = _segmentMetadata.getColumnMetadataFor(column); + boolean isSingleValue = existingColMetadata.isSingleValue(); + + if (isSingleValue) { + rewriteRawSVForwardIndex(column, segmentWriter, indexCreatorProvider); + } else { + rewriteRawMVForwardIndex(column, segmentWriter, indexCreatorProvider); + } + } + + private void rewriteRawMVForwardIndex(String column, SegmentDirectory.Writer segmentWriter, + IndexCreatorProvider indexCreatorProvider) + throws Exception { + ColumnMetadata existingColMetadata = _segmentMetadata.getColumnMetadataFor(column); + File indexDir = _segmentMetadata.getIndexDir(); + String segmentName = _segmentMetadata.getName(); + File inProgress = new File(indexDir, column + ".fwd.inprogress"); + File fwdIndexFile = new File(indexDir, column + V1Constants.Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION); + + if (!inProgress.exists()) { + // Marker file does not exist, which means last run ended normally. + // Create a marker file. + FileUtils.touch(inProgress); + } else { + // Marker file exists, which means last run was interrupted. + // Remove forward index if exists. + FileUtils.deleteQuietly(fwdIndexFile); + } + + LOGGER.info("Creating new forward index for segment={} and column={}", segmentName, column); + Map<String, ChunkCompressionType> compressionConfigs = _indexLoadingConfig.getCompressionConfigs(); + Preconditions.checkState(compressionConfigs.containsKey(column)); + // At this point, compressionConfigs is guaranteed to contain the column. + ChunkCompressionType newCompressionType = compressionConfigs.get(column); + + int numDocs = existingColMetadata.getTotalDocs(); + + try (ForwardIndexReader reader = LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) { + int lengthOfLongestEntry = reader.getLengthOfLongestEntry(); + Preconditions.checkState(lengthOfLongestEntry >= 0, + "lengthOfLongestEntry cannot be negative. segment=" + segmentName + " column={}" + column); + int maxNumberOfMVEntries = existingColMetadata.getMaxNumberOfMultiValues(); + int maxRowLengthInBytes = lengthOfLongestEntry - (Integer.BYTES * maxNumberOfMVEntries) - Integer.BYTES; + + IndexCreationContext.Forward context = + IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata) + .withLengthOfLongestEntry(lengthOfLongestEntry).withMaxRowLengthInBytes(maxRowLengthInBytes).build() + .forForwardIndex(newCompressionType, _indexLoadingConfig.getColumnProperties()); + try (ForwardIndexCreator creator = indexCreatorProvider.newForwardIndexCreator(context)) { + // If creator stored type and the reader stored type do not match, throw an exception. + if (!reader.getStoredType().equals(creator.getValueType())) { + String failureMsg = + "Unsupported operation to change datatype for column=" + column + " from " + reader.getStoredType() + .toString() + " to " + creator.getValueType().toString(); + throw new UnsupportedOperationException(failureMsg); + } + + PinotSegmentColumnReader columnReader = new PinotSegmentColumnReader(reader, null, null, maxNumberOfMVEntries); + + for (int i = 0; i < numDocs; i++) { + Object[] values = (Object[]) columnReader.getValue(i); Review Comment: wondering if it's a better idea to expose the getIntMV, etc functions in the `columnReader` or directly use the `ForwardIndexReader` instead of doing it this way. In your code, you're doing double copies, first to construct the `Object[] values` and then to construct the actual value type array. The `ForwardIndexReader` directly provides APIs to get the array in the desired type: ``` default int getIntMV(int docId, int[] valueBuffer, T context) { throw new UnsupportedOperationException(); } default int[] getIntMV(int docId, T context) { throw new UnsupportedOperationException(); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org