vvivekiyer commented on code in PR #9510: URL: https://github.com/apache/pinot/pull/9510#discussion_r996110231
########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java: ########## @@ -220,54 +252,161 @@ private void rewriteRawForwardIndex(String column, SegmentDirectory.Writer segme throw new UnsupportedOperationException(failureMsg); } - PinotSegmentColumnReader columnReader = - new PinotSegmentColumnReader(reader, null, null, existingColMetadata.getMaxNumberOfMultiValues()); - - for (int i = 0; i < numDocs; i++) { - Object val = columnReader.getValue(i); - - // JSON fields are either stored as string or bytes. No special handling is needed because we make this - // decision based on the storedType of the reader. - switch (reader.getStoredType()) { - case INT: - creator.putInt((int) val); - break; - case LONG: - creator.putLong((long) val); - break; - case FLOAT: - creator.putFloat((float) val); - break; - case DOUBLE: - creator.putDouble((double) val); - break; - case STRING: - creator.putString((String) val); - break; - case BYTES: - creator.putBytes((byte[]) val); - break; - case BIG_DECIMAL: - creator.putBigDecimal((BigDecimal) val); - break; - default: - throw new IllegalStateException(); - } - } + int numDocs = existingColMetadata.getTotalDocs(); + forwardIndexWriterHelper(column, reader, creator, numDocs, maxNumberOfMVEntries); } } + } - // We used the existing forward index to generate a new forward index. The existing forward index will be in V3 - // format and the new forward index will be in V1 format. Remove the existing forward index as it is not needed - // anymore. Note that removeIndex() will only mark an index for removal and remove the in-memory state. The - // actual cleanup from columns.psf file will happen when singleFileIndexDirectory.cleanupRemovedIndices() is - // called during segmentWriter.close(). - segmentWriter.removeIndex(column, ColumnIndexType.FORWARD_INDEX); - LoaderUtils.writeIndexToV3Format(segmentWriter, column, fwdIndexFile, ColumnIndexType.FORWARD_INDEX); + private void rewriteRawSVForwardIndex(String column, ColumnMetadata existingColMetadata, File indexDir, + SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider, + ChunkCompressionType newCompressionType) + throws Exception { + try (ForwardIndexReader reader = LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) { + int lengthOfLongestEntry = reader.getLengthOfLongestEntry(); - // Delete the marker file. - FileUtils.deleteQuietly(inProgress); + IndexCreationContext.Forward context = + IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata) + .withLengthOfLongestEntry(lengthOfLongestEntry).build() + .forForwardIndex(newCompressionType, _indexLoadingConfig.getColumnProperties()); - LOGGER.info("Created forward index for segment: {}, column: {}", segmentName, column); + try (ForwardIndexCreator creator = indexCreatorProvider.newForwardIndexCreator(context)) { + // If creator stored type and the reader stored type do not match, throw an exception. + if (!reader.getStoredType().equals(creator.getValueType())) { + String failureMsg = + "Unsupported operation to change datatype for column=" + column + " from " + reader.getStoredType() + .toString() + " to " + creator.getValueType().toString(); + throw new UnsupportedOperationException(failureMsg); + } + + int numDocs = existingColMetadata.getTotalDocs(); + forwardIndexWriterHelper(column, reader, creator, numDocs, -1); + } + } + } + + private void forwardIndexWriterHelper(String column, ForwardIndexReader reader, ForwardIndexCreator creator, + int numDocs, int maxNumberOfMVEntries) { + // If creator stored type should match reader stored type. We do not support changing datatypes. + if (!reader.getStoredType().equals(creator.getValueType())) { + String failureMsg = + "Unsupported operation to change datatype for column=" + column + " from " + reader.getStoredType().toString() + + " to " + creator.getValueType().toString(); + throw new UnsupportedOperationException(failureMsg); + } + + ForwardIndexReaderContext readerContext = reader.createContext(); + boolean isSVColumn = reader.isSingleValue(); + + for (int i = 0; i < numDocs; i++) { + // JSON fields are either stored as string or bytes. No special handling is needed because we make this + // decision based on the storedType of the reader. + switch (reader.getStoredType()) { + case INT: { + if (isSVColumn) { + int val = reader.getInt(i, readerContext); + creator.putInt(val); + } else { + int[] buffer = new int[maxNumberOfMVEntries]; + int length = reader.getIntMV(i, buffer, readerContext); Review Comment: Nice, I didn't know about it. Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org