siddharthteotia commented on code in PR #9868:
URL: https://github.com/apache/pinot/pull/9868#discussion_r1035175557


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -400,117 +436,262 @@ private void rewriteRawSVForwardIndex(String column, 
ColumnMetadata existingColM
         }
 
         int numDocs = existingColMetadata.getTotalDocs();
-        forwardIndexWriterHelper(column, existingColMetadata, reader, creator, 
numDocs, null);
+        forwardIndexRewriteHelper(column, existingColMetadata, reader, 
creator, numDocs, null, null);
       }
     }
   }
 
-  private void forwardIndexWriterHelper(String column, ColumnMetadata 
existingColumnMetadata, ForwardIndexReader reader,
-      ForwardIndexCreator creator, int numDocs, @Nullable 
SegmentDictionaryCreator dictionaryCreator) {
+  private void forwardIndexRewriteHelper(String column, ColumnMetadata 
existingColumnMetadata,
+      ForwardIndexReader reader, ForwardIndexCreator creator, int numDocs,
+      @Nullable SegmentDictionaryCreator dictionaryCreator, @Nullable 
Dictionary dictionaryReader) {
+    if (dictionaryReader == null && dictionaryCreator == null) {
+      // Read raw forward index and write raw forward index.
+      forwardIndexReadRawWriteRawHelper(column, existingColumnMetadata, 
reader, creator, numDocs);
+    } else if (dictionaryReader != null && dictionaryCreator == null) {
+      // Read dictionary based forward index and write raw forward index.
+      forwardIndexReadDictWriteRawHelper(column, existingColumnMetadata, 
reader, creator, numDocs, dictionaryReader);
+    } else if (dictionaryReader == null && dictionaryCreator != null) {
+      // Read raw forward index and write dictionary based forward index.
+      forwardIndexReadRawWriteDictHelper(column, existingColumnMetadata, 
reader, creator, numDocs, dictionaryCreator);
+    } else {
+      Preconditions.checkState(false, "Invalid dict-based read/write for 
column=" + column);
+    }
+  }
+
+  private void forwardIndexReadRawWriteRawHelper(String column, ColumnMetadata 
existingColumnMetadata,
+      ForwardIndexReader reader, ForwardIndexCreator creator, int numDocs) {
     ForwardIndexReaderContext readerContext = reader.createContext();
     boolean isSVColumn = reader.isSingleValue();
 
-    if (dictionaryCreator != null) {
-      int maxNumValuesPerEntry = 
existingColumnMetadata.getMaxNumberOfMultiValues();
-      PinotSegmentColumnReader columnReader = new 
PinotSegmentColumnReader(reader, null, null, maxNumValuesPerEntry);
-
-      for (int i = 0; i < numDocs; i++) {
-        Object obj = columnReader.getValue(i);
-
-        if (isSVColumn) {
-          int dictId = dictionaryCreator.indexOfSV(obj);
-          creator.putDictId(dictId);
-        } else {
-          int[] dictIds = dictionaryCreator.indexOfMV(obj);
-          creator.putDictIdMV(dictIds);
+    switch (reader.getStoredType()) {
+      // JSON fields are either stored as string or bytes. No special handling 
is needed because we make this
+      // decision based on the storedType of the reader.
+      case INT: {
+        for (int i = 0; i < numDocs; i++) {
+          if (isSVColumn) {
+            int val = reader.getInt(i, readerContext);
+            creator.putInt(val);
+          } else {
+            int[] ints = reader.getIntMV(i, readerContext);
+            creator.putIntMV(ints);
+          }
         }
+        break;
       }
-    } else {
-      switch (reader.getStoredType()) {
-        // JSON fields are either stored as string or bytes. No special 
handling is needed because we make this
-        // decision based on the storedType of the reader.
-        case INT: {
-          for (int i = 0; i < numDocs; i++) {
-            if (isSVColumn) {
-              int val = reader.getInt(i, readerContext);
-              creator.putInt(val);
-            } else {
-              int[] ints = reader.getIntMV(i, readerContext);
-              creator.putIntMV(ints);
-            }
+      case LONG: {
+        for (int i = 0; i < numDocs; i++) {
+          if (isSVColumn) {
+            long val = reader.getLong(i, readerContext);
+            creator.putLong(val);
+          } else {
+            long[] longs = reader.getLongMV(i, readerContext);
+            creator.putLongMV(longs);
           }
-          break;
         }
-        case LONG: {
-          for (int i = 0; i < numDocs; i++) {
-            if (isSVColumn) {
-              long val = reader.getLong(i, readerContext);
-              creator.putLong(val);
-            } else {
-              long[] longs = reader.getLongMV(i, readerContext);
-              creator.putLongMV(longs);
+        break;
+      }
+      case FLOAT: {
+        for (int i = 0; i < numDocs; i++) {
+          if (isSVColumn) {
+            float val = reader.getFloat(i, readerContext);
+            creator.putFloat(val);
+          } else {
+            float[] floats = reader.getFloatMV(i, readerContext);
+            creator.putFloatMV(floats);
+          }
+        }
+        break;
+      }
+      case DOUBLE: {
+        for (int i = 0; i < numDocs; i++) {
+          if (isSVColumn) {
+            double val = reader.getDouble(i, readerContext);
+            creator.putDouble(val);
+          } else {
+            double[] doubles = reader.getDoubleMV(i, readerContext);
+            creator.putDoubleMV(doubles);
+          }
+        }
+        break;
+      }
+      case STRING: {
+        for (int i = 0; i < numDocs; i++) {
+          if (isSVColumn) {
+            String val = reader.getString(i, readerContext);
+            creator.putString(val);
+          } else {
+            String[] strings = reader.getStringMV(i, readerContext);
+            creator.putStringMV(strings);
+          }
+        }
+        break;
+      }
+      case BYTES: {
+        for (int i = 0; i < numDocs; i++) {
+          if (isSVColumn) {
+            byte[] val = reader.getBytes(i, readerContext);
+            creator.putBytes(val);
+          } else {
+            byte[][] bytesArray = reader.getBytesMV(i, readerContext);
+            creator.putBytesMV(bytesArray);
+          }
+        }
+        break;
+      }
+      case BIG_DECIMAL: {
+        Preconditions.checkState(isSVColumn, "BigDecimal is not supported for 
MV columns");
+        for (int i = 0; i < numDocs; i++) {
+          BigDecimal val = reader.getBigDecimal(i, readerContext);
+          creator.putBigDecimal(val);
+        }
+        break;
+      }
+      default:
+        throw new IllegalStateException("Unsupported storedType=" + 
reader.getStoredType() + " for column=" + column);
+    }
+  }
+
+  private void forwardIndexReadDictWriteRawHelper(String column, 
ColumnMetadata existingColumnMetadata,
+      ForwardIndexReader reader, ForwardIndexCreator creator, int numDocs, 
Dictionary dictionaryReader) {
+    ForwardIndexReaderContext readerContext = reader.createContext();
+    boolean isSVColumn = reader.isSingleValue();
+    int maxNumValuesPerEntry = 
existingColumnMetadata.getMaxNumberOfMultiValues();
+    PinotSegmentColumnReader columnReader =
+        new PinotSegmentColumnReader(reader, dictionaryReader, null, 
maxNumValuesPerEntry);
+    FieldSpec.DataType storedType = 
dictionaryReader.getValueType().getStoredType();
+
+    switch (storedType) {
+      case INT: {
+        for (int i = 0; i < numDocs; i++) {
+          if (isSVColumn) {
+            Object obj = columnReader.getValue(i);
+            int val = (int) obj;
+            creator.putInt(val);
+          } else {
+            int[] dictIds = reader.getDictIdMV(i, readerContext);
+            int[] ints = new int[dictIds.length];
+            for (int j = 0; j < dictIds.length; j++) {
+              ints[j] = (int) dictionaryReader.get(dictIds[j]);

Review Comment:
   May be for SV also we can follow the same reading patter to be consistent ?
   
   Read dictID from fwd index reader and then read the dict to get 
corresponding raw value and then insert.
   
   Basically columnReader isn't technically needed ?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -400,117 +436,262 @@ private void rewriteRawSVForwardIndex(String column, 
ColumnMetadata existingColM
         }
 
         int numDocs = existingColMetadata.getTotalDocs();
-        forwardIndexWriterHelper(column, existingColMetadata, reader, creator, 
numDocs, null);
+        forwardIndexRewriteHelper(column, existingColMetadata, reader, 
creator, numDocs, null, null);
       }
     }
   }
 
-  private void forwardIndexWriterHelper(String column, ColumnMetadata 
existingColumnMetadata, ForwardIndexReader reader,
-      ForwardIndexCreator creator, int numDocs, @Nullable 
SegmentDictionaryCreator dictionaryCreator) {
+  private void forwardIndexRewriteHelper(String column, ColumnMetadata 
existingColumnMetadata,
+      ForwardIndexReader reader, ForwardIndexCreator creator, int numDocs,
+      @Nullable SegmentDictionaryCreator dictionaryCreator, @Nullable 
Dictionary dictionaryReader) {
+    if (dictionaryReader == null && dictionaryCreator == null) {
+      // Read raw forward index and write raw forward index.
+      forwardIndexReadRawWriteRawHelper(column, existingColumnMetadata, 
reader, creator, numDocs);
+    } else if (dictionaryReader != null && dictionaryCreator == null) {
+      // Read dictionary based forward index and write raw forward index.
+      forwardIndexReadDictWriteRawHelper(column, existingColumnMetadata, 
reader, creator, numDocs, dictionaryReader);
+    } else if (dictionaryReader == null && dictionaryCreator != null) {
+      // Read raw forward index and write dictionary based forward index.
+      forwardIndexReadRawWriteDictHelper(column, existingColumnMetadata, 
reader, creator, numDocs, dictionaryCreator);
+    } else {
+      Preconditions.checkState(false, "Invalid dict-based read/write for 
column=" + column);
+    }
+  }
+
+  private void forwardIndexReadRawWriteRawHelper(String column, ColumnMetadata 
existingColumnMetadata,
+      ForwardIndexReader reader, ForwardIndexCreator creator, int numDocs) {
     ForwardIndexReaderContext readerContext = reader.createContext();
     boolean isSVColumn = reader.isSingleValue();
 
-    if (dictionaryCreator != null) {
-      int maxNumValuesPerEntry = 
existingColumnMetadata.getMaxNumberOfMultiValues();
-      PinotSegmentColumnReader columnReader = new 
PinotSegmentColumnReader(reader, null, null, maxNumValuesPerEntry);
-
-      for (int i = 0; i < numDocs; i++) {
-        Object obj = columnReader.getValue(i);
-
-        if (isSVColumn) {
-          int dictId = dictionaryCreator.indexOfSV(obj);
-          creator.putDictId(dictId);
-        } else {
-          int[] dictIds = dictionaryCreator.indexOfMV(obj);
-          creator.putDictIdMV(dictIds);
+    switch (reader.getStoredType()) {
+      // JSON fields are either stored as string or bytes. No special handling 
is needed because we make this
+      // decision based on the storedType of the reader.
+      case INT: {
+        for (int i = 0; i < numDocs; i++) {
+          if (isSVColumn) {
+            int val = reader.getInt(i, readerContext);
+            creator.putInt(val);
+          } else {
+            int[] ints = reader.getIntMV(i, readerContext);
+            creator.putIntMV(ints);
+          }
         }
+        break;
       }
-    } else {
-      switch (reader.getStoredType()) {
-        // JSON fields are either stored as string or bytes. No special 
handling is needed because we make this
-        // decision based on the storedType of the reader.
-        case INT: {
-          for (int i = 0; i < numDocs; i++) {
-            if (isSVColumn) {
-              int val = reader.getInt(i, readerContext);
-              creator.putInt(val);
-            } else {
-              int[] ints = reader.getIntMV(i, readerContext);
-              creator.putIntMV(ints);
-            }
+      case LONG: {
+        for (int i = 0; i < numDocs; i++) {
+          if (isSVColumn) {
+            long val = reader.getLong(i, readerContext);
+            creator.putLong(val);
+          } else {
+            long[] longs = reader.getLongMV(i, readerContext);
+            creator.putLongMV(longs);
           }
-          break;
         }
-        case LONG: {
-          for (int i = 0; i < numDocs; i++) {
-            if (isSVColumn) {
-              long val = reader.getLong(i, readerContext);
-              creator.putLong(val);
-            } else {
-              long[] longs = reader.getLongMV(i, readerContext);
-              creator.putLongMV(longs);
+        break;
+      }
+      case FLOAT: {
+        for (int i = 0; i < numDocs; i++) {
+          if (isSVColumn) {
+            float val = reader.getFloat(i, readerContext);
+            creator.putFloat(val);
+          } else {
+            float[] floats = reader.getFloatMV(i, readerContext);
+            creator.putFloatMV(floats);
+          }
+        }
+        break;
+      }
+      case DOUBLE: {
+        for (int i = 0; i < numDocs; i++) {
+          if (isSVColumn) {
+            double val = reader.getDouble(i, readerContext);
+            creator.putDouble(val);
+          } else {
+            double[] doubles = reader.getDoubleMV(i, readerContext);
+            creator.putDoubleMV(doubles);
+          }
+        }
+        break;
+      }
+      case STRING: {
+        for (int i = 0; i < numDocs; i++) {
+          if (isSVColumn) {
+            String val = reader.getString(i, readerContext);
+            creator.putString(val);
+          } else {
+            String[] strings = reader.getStringMV(i, readerContext);
+            creator.putStringMV(strings);
+          }
+        }
+        break;
+      }
+      case BYTES: {
+        for (int i = 0; i < numDocs; i++) {
+          if (isSVColumn) {
+            byte[] val = reader.getBytes(i, readerContext);
+            creator.putBytes(val);
+          } else {
+            byte[][] bytesArray = reader.getBytesMV(i, readerContext);
+            creator.putBytesMV(bytesArray);
+          }
+        }
+        break;
+      }
+      case BIG_DECIMAL: {
+        Preconditions.checkState(isSVColumn, "BigDecimal is not supported for 
MV columns");
+        for (int i = 0; i < numDocs; i++) {
+          BigDecimal val = reader.getBigDecimal(i, readerContext);
+          creator.putBigDecimal(val);
+        }
+        break;
+      }
+      default:
+        throw new IllegalStateException("Unsupported storedType=" + 
reader.getStoredType() + " for column=" + column);
+    }
+  }
+
+  private void forwardIndexReadDictWriteRawHelper(String column, 
ColumnMetadata existingColumnMetadata,
+      ForwardIndexReader reader, ForwardIndexCreator creator, int numDocs, 
Dictionary dictionaryReader) {
+    ForwardIndexReaderContext readerContext = reader.createContext();
+    boolean isSVColumn = reader.isSingleValue();
+    int maxNumValuesPerEntry = 
existingColumnMetadata.getMaxNumberOfMultiValues();
+    PinotSegmentColumnReader columnReader =
+        new PinotSegmentColumnReader(reader, dictionaryReader, null, 
maxNumValuesPerEntry);
+    FieldSpec.DataType storedType = 
dictionaryReader.getValueType().getStoredType();
+
+    switch (storedType) {
+      case INT: {
+        for (int i = 0; i < numDocs; i++) {
+          if (isSVColumn) {
+            Object obj = columnReader.getValue(i);
+            int val = (int) obj;
+            creator.putInt(val);
+          } else {
+            int[] dictIds = reader.getDictIdMV(i, readerContext);
+            int[] ints = new int[dictIds.length];
+            for (int j = 0; j < dictIds.length; j++) {
+              ints[j] = (int) dictionaryReader.get(dictIds[j]);

Review Comment:
   May be for SV also we can follow the same reading pattern to be consistent ?
   
   Read dictID from fwd index reader and then read the dict to get 
corresponding raw value and then insert.
   
   Basically columnReader isn't technically needed ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to