somandal commented on code in PR #9510: URL: https://github.com/apache/pinot/pull/9510#discussion_r995950788
########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java: ########## @@ -220,54 +252,161 @@ private void rewriteRawForwardIndex(String column, SegmentDirectory.Writer segme throw new UnsupportedOperationException(failureMsg); } - PinotSegmentColumnReader columnReader = - new PinotSegmentColumnReader(reader, null, null, existingColMetadata.getMaxNumberOfMultiValues()); - - for (int i = 0; i < numDocs; i++) { - Object val = columnReader.getValue(i); - - // JSON fields are either stored as string or bytes. No special handling is needed because we make this - // decision based on the storedType of the reader. - switch (reader.getStoredType()) { - case INT: - creator.putInt((int) val); - break; - case LONG: - creator.putLong((long) val); - break; - case FLOAT: - creator.putFloat((float) val); - break; - case DOUBLE: - creator.putDouble((double) val); - break; - case STRING: - creator.putString((String) val); - break; - case BYTES: - creator.putBytes((byte[]) val); - break; - case BIG_DECIMAL: - creator.putBigDecimal((BigDecimal) val); - break; - default: - throw new IllegalStateException(); - } - } + int numDocs = existingColMetadata.getTotalDocs(); + forwardIndexWriterHelper(column, reader, creator, numDocs, maxNumberOfMVEntries); } } + } - // We used the existing forward index to generate a new forward index. The existing forward index will be in V3 - // format and the new forward index will be in V1 format. Remove the existing forward index as it is not needed - // anymore. Note that removeIndex() will only mark an index for removal and remove the in-memory state. The - // actual cleanup from columns.psf file will happen when singleFileIndexDirectory.cleanupRemovedIndices() is - // called during segmentWriter.close(). - segmentWriter.removeIndex(column, ColumnIndexType.FORWARD_INDEX); - LoaderUtils.writeIndexToV3Format(segmentWriter, column, fwdIndexFile, ColumnIndexType.FORWARD_INDEX); + private void rewriteRawSVForwardIndex(String column, ColumnMetadata existingColMetadata, File indexDir, + SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider, + ChunkCompressionType newCompressionType) + throws Exception { + try (ForwardIndexReader reader = LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) { + int lengthOfLongestEntry = reader.getLengthOfLongestEntry(); - // Delete the marker file. - FileUtils.deleteQuietly(inProgress); + IndexCreationContext.Forward context = + IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata) + .withLengthOfLongestEntry(lengthOfLongestEntry).build() + .forForwardIndex(newCompressionType, _indexLoadingConfig.getColumnProperties()); - LOGGER.info("Created forward index for segment: {}, column: {}", segmentName, column); + try (ForwardIndexCreator creator = indexCreatorProvider.newForwardIndexCreator(context)) { + // If creator stored type and the reader stored type do not match, throw an exception. + if (!reader.getStoredType().equals(creator.getValueType())) { + String failureMsg = + "Unsupported operation to change datatype for column=" + column + " from " + reader.getStoredType() + .toString() + " to " + creator.getValueType().toString(); + throw new UnsupportedOperationException(failureMsg); + } + + int numDocs = existingColMetadata.getTotalDocs(); + forwardIndexWriterHelper(column, reader, creator, numDocs, -1); + } + } + } + + private void forwardIndexWriterHelper(String column, ForwardIndexReader reader, ForwardIndexCreator creator, + int numDocs, int maxNumberOfMVEntries) { + // If creator stored type should match reader stored type. We do not support changing datatypes. + if (!reader.getStoredType().equals(creator.getValueType())) { + String failureMsg = + "Unsupported operation to change datatype for column=" + column + " from " + reader.getStoredType().toString() + + " to " + creator.getValueType().toString(); + throw new UnsupportedOperationException(failureMsg); + } + + ForwardIndexReaderContext readerContext = reader.createContext(); + boolean isSVColumn = reader.isSingleValue(); + + for (int i = 0; i < numDocs; i++) { + // JSON fields are either stored as string or bytes. No special handling is needed because we make this + // decision based on the storedType of the reader. + switch (reader.getStoredType()) { + case INT: { + if (isSVColumn) { + int val = reader.getInt(i, readerContext); + creator.putInt(val); + } else { + int[] buffer = new int[maxNumberOfMVEntries]; + int length = reader.getIntMV(i, buffer, readerContext); Review Comment: any specific reason to use this API over the following: ``` /** * Reads the INT type multi-value at the given document id. * * @param docId Document id * @param context Reader context * @return INT values at the given document id */ default int[] getIntMV(int docId, T context) { throw new UnsupportedOperationException(); } ``` An example implementation of the above: ``` @Override public int[] getIntMV(int docId, ChunkReaderContext context) { ByteBuffer byteBuffer = slice(docId, context); int numValues = byteBuffer.getInt(); int[] valueBuffer = new int[numValues]; for (int i = 0; i < numValues; i++) { valueBuffer[i] = byteBuffer.getInt(); } return valueBuffer; } ``` This directly returns the int[] buffer with the correct size and elements filled in ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java: ########## @@ -220,54 +252,161 @@ private void rewriteRawForwardIndex(String column, SegmentDirectory.Writer segme throw new UnsupportedOperationException(failureMsg); } - PinotSegmentColumnReader columnReader = - new PinotSegmentColumnReader(reader, null, null, existingColMetadata.getMaxNumberOfMultiValues()); - - for (int i = 0; i < numDocs; i++) { - Object val = columnReader.getValue(i); - - // JSON fields are either stored as string or bytes. No special handling is needed because we make this - // decision based on the storedType of the reader. - switch (reader.getStoredType()) { - case INT: - creator.putInt((int) val); - break; - case LONG: - creator.putLong((long) val); - break; - case FLOAT: - creator.putFloat((float) val); - break; - case DOUBLE: - creator.putDouble((double) val); - break; - case STRING: - creator.putString((String) val); - break; - case BYTES: - creator.putBytes((byte[]) val); - break; - case BIG_DECIMAL: - creator.putBigDecimal((BigDecimal) val); - break; - default: - throw new IllegalStateException(); - } - } + int numDocs = existingColMetadata.getTotalDocs(); + forwardIndexWriterHelper(column, reader, creator, numDocs, maxNumberOfMVEntries); } } + } - // We used the existing forward index to generate a new forward index. The existing forward index will be in V3 - // format and the new forward index will be in V1 format. Remove the existing forward index as it is not needed - // anymore. Note that removeIndex() will only mark an index for removal and remove the in-memory state. The - // actual cleanup from columns.psf file will happen when singleFileIndexDirectory.cleanupRemovedIndices() is - // called during segmentWriter.close(). - segmentWriter.removeIndex(column, ColumnIndexType.FORWARD_INDEX); - LoaderUtils.writeIndexToV3Format(segmentWriter, column, fwdIndexFile, ColumnIndexType.FORWARD_INDEX); + private void rewriteRawSVForwardIndex(String column, ColumnMetadata existingColMetadata, File indexDir, + SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider, + ChunkCompressionType newCompressionType) + throws Exception { + try (ForwardIndexReader reader = LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) { + int lengthOfLongestEntry = reader.getLengthOfLongestEntry(); - // Delete the marker file. - FileUtils.deleteQuietly(inProgress); + IndexCreationContext.Forward context = + IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata) + .withLengthOfLongestEntry(lengthOfLongestEntry).build() + .forForwardIndex(newCompressionType, _indexLoadingConfig.getColumnProperties()); - LOGGER.info("Created forward index for segment: {}, column: {}", segmentName, column); + try (ForwardIndexCreator creator = indexCreatorProvider.newForwardIndexCreator(context)) { + // If creator stored type and the reader stored type do not match, throw an exception. + if (!reader.getStoredType().equals(creator.getValueType())) { + String failureMsg = + "Unsupported operation to change datatype for column=" + column + " from " + reader.getStoredType() + .toString() + " to " + creator.getValueType().toString(); + throw new UnsupportedOperationException(failureMsg); + } + + int numDocs = existingColMetadata.getTotalDocs(); + forwardIndexWriterHelper(column, reader, creator, numDocs, -1); + } + } + } + + private void forwardIndexWriterHelper(String column, ForwardIndexReader reader, ForwardIndexCreator creator, + int numDocs, int maxNumberOfMVEntries) { + // If creator stored type should match reader stored type. We do not support changing datatypes. + if (!reader.getStoredType().equals(creator.getValueType())) { + String failureMsg = + "Unsupported operation to change datatype for column=" + column + " from " + reader.getStoredType().toString() + + " to " + creator.getValueType().toString(); + throw new UnsupportedOperationException(failureMsg); + } + + ForwardIndexReaderContext readerContext = reader.createContext(); + boolean isSVColumn = reader.isSingleValue(); + + for (int i = 0; i < numDocs; i++) { + // JSON fields are either stored as string or bytes. No special handling is needed because we make this + // decision based on the storedType of the reader. + switch (reader.getStoredType()) { + case INT: { + if (isSVColumn) { + int val = reader.getInt(i, readerContext); + creator.putInt(val); + } else { + int[] buffer = new int[maxNumberOfMVEntries]; + int length = reader.getIntMV(i, buffer, readerContext); Review Comment: any specific reason to use this API over the following?: ``` /** * Reads the INT type multi-value at the given document id. * * @param docId Document id * @param context Reader context * @return INT values at the given document id */ default int[] getIntMV(int docId, T context) { throw new UnsupportedOperationException(); } ``` An example implementation of the above: ``` @Override public int[] getIntMV(int docId, ChunkReaderContext context) { ByteBuffer byteBuffer = slice(docId, context); int numValues = byteBuffer.getInt(); int[] valueBuffer = new int[numValues]; for (int i = 0; i < numValues; i++) { valueBuffer[i] = byteBuffer.getInt(); } return valueBuffer; } ``` This directly returns the int[] buffer with the correct size and elements filled in -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org