siddharthteotia commented on code in PR #9868: URL: https://github.com/apache/pinot/pull/9868#discussion_r1035470342
########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java: ########## @@ -677,4 +836,168 @@ private void updateMetadataProperties(File indexDir, Map<String, String> metadat properties.save(); } + + private void disableDictionaryAndCreateRawForwardIndex(String column, SegmentDirectory.Writer segmentWriter, + IndexCreatorProvider indexCreatorProvider) + throws Exception { + ColumnMetadata existingColMetadata = _segmentMetadata.getColumnMetadataFor(column); + boolean isSingleValue = existingColMetadata.isSingleValue(); + + File indexDir = _segmentMetadata.getIndexDir(); + String segmentName = _segmentMetadata.getName(); + File inProgress = new File(indexDir, column + ".fwd.inprogress"); + String fileExtension = isSingleValue ? V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION + : V1Constants.Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION; + File fwdIndexFile = new File(indexDir, column + fileExtension); + + if (!inProgress.exists()) { + // Marker file does not exist, which means last run ended normally. + // Create a marker file. + FileUtils.touch(inProgress); + } else { + // Marker file exists, which means last run was interrupted. + // Remove forward index if exists. + FileUtils.deleteQuietly(fwdIndexFile); + } + + LOGGER.info("Creating raw forward index for segment={} and column={}", segmentName, column); + rewriteDictToRawForwardIndex(column, existingColMetadata, segmentWriter, indexDir, indexCreatorProvider); + + // Remove dictionary and forward index + segmentWriter.removeIndex(column, ColumnIndexType.FORWARD_INDEX); + segmentWriter.removeIndex(column, ColumnIndexType.DICTIONARY); + LoaderUtils.writeIndexToV3Format(segmentWriter, column, fwdIndexFile, ColumnIndexType.FORWARD_INDEX); + + LOGGER.info("Created raw forwardIndex. Updating metadata properties for segment={} and column={}", segmentName, + column); + Map<String, String> metadataProperties = new HashMap<>(); + metadataProperties.put(getKeyFor(column, HAS_DICTIONARY), String.valueOf(false)); + metadataProperties.put(getKeyFor(column, DICTIONARY_ELEMENT_SIZE), String.valueOf(0)); + updateMetadataProperties(indexDir, metadataProperties); + + // Remove range index, inverted index and FST index. + removeDictRelatedIndexes(column, segmentWriter); + + // Delete marker file. + FileUtils.deleteQuietly(inProgress); + + LOGGER.info("Created raw based forward index for segment: {}, column: {}", segmentName, column); + } + + private void rewriteDictToRawForwardIndex(String column, ColumnMetadata existingColMetadata, + SegmentDirectory.Writer segmentWriter, File indexDir, IndexCreatorProvider indexCreatorProvider) + throws Exception { + try (ForwardIndexReader reader = LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) { + Dictionary dictionary = LoaderUtils.getDictionary(segmentWriter, existingColMetadata); + // lengthOfLongestEntry not available for dict columns. Should be computed by reading all values in dictionary. + IndexCreationContext.Builder builder = + IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata); + builder.withDictionary(false); + + if (existingColMetadata.isSingleValue()) { + int lengthOfLongestEntry = getLengthOfLongestEntryFromDictionary(column, reader, dictionary); + builder.withLengthOfLongestEntry(lengthOfLongestEntry); + } else { + FieldSpec.DataType dataType = existingColMetadata.getDataType(); + boolean isFixedWidth = dataType.getStoredType().isFixedWidth(); + + if (!isFixedWidth) { + // For variable length stored types, maxRowLengthInBytes is required to create the forwardIndexCreator. + // This can only be determined by reading the entire MV forward index. + int maxRowLength = getMaxRowLengthForMVColumn(column, reader, dictionary); + builder.withMaxRowLengthInBytes(maxRowLength); + } + } + + Map<String, ChunkCompressionType> compressionConfigs = _indexLoadingConfig.getCompressionConfigs(); + ChunkCompressionType compressionType; + if (compressionConfigs.containsKey(column)) { + compressionType = compressionConfigs.get(column); + } else { + compressionType = SegmentColumnarIndexCreator.getDefaultCompressionType(existingColMetadata.getFieldType()); + } + + IndexCreationContext.Forward context = + builder.build().forForwardIndex(compressionType, _indexLoadingConfig.getColumnProperties()); + try (ForwardIndexCreator creator = indexCreatorProvider.newForwardIndexCreator(context)) { + int numDocs = existingColMetadata.getTotalDocs(); + forwardIndexRewriteHelper(column, existingColMetadata, reader, creator, numDocs, null, dictionary); + } + } + } + + private int getMaxRowLengthForMVColumn(String column, ForwardIndexReader reader, Dictionary dictionary) + throws Exception { + ColumnMetadata existingColMetadata = _segmentMetadata.getColumnMetadataFor(column); + AbstractColumnStatisticsCollector statsCollector = Review Comment: Probably nothing wrong in theory but I wonder why do we need to go through StatsCollector framework to gather this info. We can simply read the forward index to collect this right ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org