vvivekiyer commented on code in PR #9510:
URL: https://github.com/apache/pinot/pull/9510#discussion_r995568614


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -173,7 +168,143 @@ private void rewriteRawForwardIndex(String column, 
SegmentDirectory.Writer segme
       IndexCreatorProvider indexCreatorProvider)
       throws Exception {
     Preconditions.checkState(_segmentMetadata.getVersion() == 
SegmentVersion.v3);
+    ColumnMetadata existingColMetadata = 
_segmentMetadata.getColumnMetadataFor(column);
+    boolean isSingleValue = existingColMetadata.isSingleValue();
+
+    if (isSingleValue) {
+      rewriteRawSVForwardIndex(column, segmentWriter, indexCreatorProvider);
+    } else {
+      rewriteRawMVForwardIndex(column, segmentWriter, indexCreatorProvider);
+    }
+  }
+
+  private void rewriteRawMVForwardIndex(String column, SegmentDirectory.Writer 
segmentWriter,
+      IndexCreatorProvider indexCreatorProvider)
+      throws Exception {
+    ColumnMetadata existingColMetadata = 
_segmentMetadata.getColumnMetadataFor(column);
+    File indexDir = _segmentMetadata.getIndexDir();
+    String segmentName = _segmentMetadata.getName();
+    File inProgress = new File(indexDir, column + ".fwd.inprogress");
+    File fwdIndexFile = new File(indexDir, column + 
V1Constants.Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
+
+    if (!inProgress.exists()) {
+      // Marker file does not exist, which means last run ended normally.
+      // Create a marker file.
+      FileUtils.touch(inProgress);
+    } else {
+      // Marker file exists, which means last run was interrupted.
+      // Remove forward index if exists.
+      FileUtils.deleteQuietly(fwdIndexFile);
+    }
+
+    LOGGER.info("Creating new forward index for segment={} and column={}", 
segmentName, column);
+     Map<String, ChunkCompressionType> compressionConfigs = 
_indexLoadingConfig.getCompressionConfigs();
+     Preconditions.checkState(compressionConfigs.containsKey(column));
+    // At this point, compressionConfigs is guaranteed to contain the column.
+    ChunkCompressionType newCompressionType = compressionConfigs.get(column);
+
+    int numDocs = existingColMetadata.getTotalDocs();
+
+    try (ForwardIndexReader reader = 
LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) {
+      int lengthOfLongestEntry = reader.getLengthOfLongestEntry();
+      Preconditions.checkState(lengthOfLongestEntry >= 0,
+          "lengthOfLongestEntry cannot be negative. segment=" + segmentName + 
" column={}" + column);
+      int maxNumberOfMVEntries = 
existingColMetadata.getMaxNumberOfMultiValues();
+      int maxRowLengthInBytes = lengthOfLongestEntry - (Integer.BYTES * 
maxNumberOfMVEntries) - Integer.BYTES;
+
+      IndexCreationContext.Forward context =
+          
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata)
+              
.withLengthOfLongestEntry(lengthOfLongestEntry).withMaxRowLengthInBytes(maxRowLengthInBytes).build()
+              .forForwardIndex(newCompressionType, 
_indexLoadingConfig.getColumnProperties());
 
+      try (ForwardIndexCreator creator = 
indexCreatorProvider.newForwardIndexCreator(context)) {
+        // If creator stored type and the reader stored type do not match, 
throw an exception.
+        if (!reader.getStoredType().equals(creator.getValueType())) {
+          String failureMsg =
+              "Unsupported operation to change datatype for column=" + column 
+ " from " + reader.getStoredType()
+                  .toString() + " to " + creator.getValueType().toString();
+          throw new UnsupportedOperationException(failureMsg);
+        }
+
+        PinotSegmentColumnReader columnReader = new 
PinotSegmentColumnReader(reader, null, null, maxNumberOfMVEntries);
+
+        for (int i = 0; i < numDocs; i++) {
+          Object[] values = (Object[]) columnReader.getValue(i);

Review Comment:
   Good point. Did this and also refactored the code quite a bit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to