vvivekiyer commented on code in PR #9510:
URL: https://github.com/apache/pinot/pull/9510#discussion_r996110231


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -220,54 +252,161 @@ private void rewriteRawForwardIndex(String column, 
SegmentDirectory.Writer segme
           throw new UnsupportedOperationException(failureMsg);
         }
 
-        PinotSegmentColumnReader columnReader =
-            new PinotSegmentColumnReader(reader, null, null, 
existingColMetadata.getMaxNumberOfMultiValues());
-
-        for (int i = 0; i < numDocs; i++) {
-          Object val = columnReader.getValue(i);
-
-          // JSON fields are either stored as string or bytes. No special 
handling is needed because we make this
-          // decision based on the storedType of the reader.
-          switch (reader.getStoredType()) {
-            case INT:
-              creator.putInt((int) val);
-              break;
-            case LONG:
-              creator.putLong((long) val);
-              break;
-            case FLOAT:
-              creator.putFloat((float) val);
-              break;
-            case DOUBLE:
-              creator.putDouble((double) val);
-              break;
-            case STRING:
-              creator.putString((String) val);
-              break;
-            case BYTES:
-              creator.putBytes((byte[]) val);
-              break;
-            case BIG_DECIMAL:
-              creator.putBigDecimal((BigDecimal) val);
-              break;
-            default:
-              throw new IllegalStateException();
-          }
-        }
+        int numDocs = existingColMetadata.getTotalDocs();
+        forwardIndexWriterHelper(column, reader, creator, numDocs, 
maxNumberOfMVEntries);
       }
     }
+  }
 
-    // We used the existing forward index to generate a new forward index. The 
existing forward index will be in V3
-    // format and the new forward index will be in V1 format. Remove the 
existing forward index as it is not needed
-    // anymore. Note that removeIndex() will only mark an index for removal 
and remove the in-memory state. The
-    // actual cleanup from columns.psf file will happen when 
singleFileIndexDirectory.cleanupRemovedIndices() is
-    // called during segmentWriter.close().
-    segmentWriter.removeIndex(column, ColumnIndexType.FORWARD_INDEX);
-    LoaderUtils.writeIndexToV3Format(segmentWriter, column, fwdIndexFile, 
ColumnIndexType.FORWARD_INDEX);
+  private void rewriteRawSVForwardIndex(String column, ColumnMetadata 
existingColMetadata, File indexDir,
+      SegmentDirectory.Writer segmentWriter, IndexCreatorProvider 
indexCreatorProvider,
+      ChunkCompressionType newCompressionType)
+      throws Exception {
+    try (ForwardIndexReader reader = 
LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) {
+      int lengthOfLongestEntry = reader.getLengthOfLongestEntry();
 
-    // Delete the marker file.
-    FileUtils.deleteQuietly(inProgress);
+      IndexCreationContext.Forward context =
+          
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata)
+              .withLengthOfLongestEntry(lengthOfLongestEntry).build()
+              .forForwardIndex(newCompressionType, 
_indexLoadingConfig.getColumnProperties());
 
-    LOGGER.info("Created forward index for segment: {}, column: {}", 
segmentName, column);
+      try (ForwardIndexCreator creator = 
indexCreatorProvider.newForwardIndexCreator(context)) {
+        // If creator stored type and the reader stored type do not match, 
throw an exception.
+        if (!reader.getStoredType().equals(creator.getValueType())) {
+          String failureMsg =
+              "Unsupported operation to change datatype for column=" + column 
+ " from " + reader.getStoredType()
+                  .toString() + " to " + creator.getValueType().toString();
+          throw new UnsupportedOperationException(failureMsg);
+        }
+
+        int numDocs = existingColMetadata.getTotalDocs();
+        forwardIndexWriterHelper(column, reader, creator, numDocs, -1);
+      }
+    }
+  }
+
+  private void forwardIndexWriterHelper(String column, ForwardIndexReader 
reader, ForwardIndexCreator creator,
+      int numDocs, int maxNumberOfMVEntries) {
+    // If creator stored type should match reader stored type. We do not 
support changing datatypes.
+    if (!reader.getStoredType().equals(creator.getValueType())) {
+      String failureMsg =
+          "Unsupported operation to change datatype for column=" + column + " 
from " + reader.getStoredType().toString()
+              + " to " + creator.getValueType().toString();
+      throw new UnsupportedOperationException(failureMsg);
+    }
+
+    ForwardIndexReaderContext readerContext = reader.createContext();
+    boolean isSVColumn = reader.isSingleValue();
+
+    for (int i = 0; i < numDocs; i++) {
+      // JSON fields are either stored as string or bytes. No special handling 
is needed because we make this
+      // decision based on the storedType of the reader.
+      switch (reader.getStoredType()) {
+        case INT: {
+          if (isSVColumn) {
+            int val = reader.getInt(i, readerContext);
+            creator.putInt(val);
+          } else {
+            int[] buffer = new int[maxNumberOfMVEntries];
+            int length = reader.getIntMV(i, buffer, readerContext);

Review Comment:
   Nice, I didn't know about it. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to