Jackie-Jiang commented on code in PR #11776:
URL: https://github.com/apache/pinot/pull/11776#discussion_r1355825121


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java:
##########
@@ -154,4 +179,12 @@ public static Schema getUpdatedSchema(Schema original) {
     }
     return newSchema;
   }
+
+  public boolean isColumnMajorEnabled() {
+    return _enableColumnMajor;
+  }
+
+  public int getTotalDocCount() {
+    return _totalDocs;
+  }

Review Comment:
   Seems not used. Suggest removing them and change these 2 variables to local 
variable



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -104,6 +106,7 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
   private int _totalDocs;
   private int _docIdCounter;
   private boolean _nullHandlingEnabled;
+  private long _durationNS = 0;

Review Comment:
   Is this used?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -334,6 +340,74 @@ public void indexRow(GenericRow row)
     }
 
     _docIdCounter++;
+    _durationNS += System.nanoTime() - startNS;
+  }
+
+  @Override
+  public void indexColumn(String columnName, @Nullable int[] sortedDocIds, 
IndexSegment segment)
+          throws IOException {
+    long startNS = System.nanoTime();
+
+    // TODO(ERICH): Get a measure of the ratio of columns to indexes (how many 
indexes per column are there)
+    // Iterate over each value in the column
+    try (PinotSegmentColumnReader colReader = new 
PinotSegmentColumnReader(segment, columnName)) {
+      int numDocs = segment.getSegmentMetadata().getTotalDocs();
+      Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex = 
_creatorsByColAndIndex.get(columnName);
+      NullValueVectorCreator nullVec = 
_nullValueVectorCreatorMap.get(columnName);
+      if (sortedDocIds != null) {
+        for (int docId : sortedDocIds) {
+          // TODO(Erich): should I avoid a function call in the loop like this?
+          indexColumnValue(colReader, creatorsByIndex, columnName, docId, 
nullVec);
+        }
+      } else {
+        for (int docId = 0; docId < numDocs; docId++) {
+          indexColumnValue(colReader, creatorsByIndex, columnName, docId, 
nullVec);
+        }
+      }
+    }
+
+    _docIdCounter++;
+    _durationNS += System.nanoTime() - startNS;
+  }
+
+  private void indexColumnValue(PinotSegmentColumnReader colReader,
+                                Map<IndexType<?, ?, ?>, IndexCreator> 
creatorsByIndex,
+                                String columnName,
+                                int docId,
+                                NullValueVectorCreator nullVec)
+  throws IOException {
+    Object columnValueToIndex = colReader.getValue(docId);
+    if (columnValueToIndex == null) {
+      throw new RuntimeException("Null value for column:" + columnName);
+    }
+
+    // TODO(ERICH): pull this out of the loop because it only needs to be 
looked up once per column
+    // TODO(ERICH): do a performance comparison for before and after pulling 
this out
+    FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+    SegmentDictionaryCreator dictionaryCreator = 
_dictionaryCreatorMap.get(columnName);
+
+    if (fieldSpec.isSingleValueField()) {
+      indexSingleValueRow(dictionaryCreator, columnValueToIndex, 
creatorsByIndex);
+    } else {
+      indexMultiValueRow(dictionaryCreator, (Object[]) columnValueToIndex, 
creatorsByIndex);
+    }
+
+    if (_nullHandlingEnabled) {
+      //handling null values
+//            In row oriented:
+//              - this.indexRow iterates over each column and checks if it 
isNullValue.  If it is then it sets the null
+//              value vector for that doc id
+//              - This null value comes from the GenericRow that is created by 
PinotSegmentRecordReader
+//              - PinotSegmentRecordReader:L224 is where we figure out the 
null value stuff
+//              - PSegRecReader calls PinotSegmentColumnReader.isNull on the 
doc id to determine if the value for that
+//              column of that docId is null
+//              - if it returns true and we are NOT skipping null values we 
put the default null value into that field
+//              of the GenericRow
+      // TODO(Erich): do we need to check the Skip Null Values flag here?  
Yes, it's done in PinotRecordReader
+      if (colReader.isNull(docId)) {

Review Comment:
   This is correct



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java:
##########
@@ -220,15 +222,20 @@ public String getSegmentName() {
   }
 
   public void getRecord(int docId, GenericRow buffer) {
+    // TODO: start duration

Review Comment:
   Is the change in this class temporary debugging code?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -273,6 +280,45 @@ public void build()
     handlePostCreation();
   }
 
+  public void buildByColumn(IndexSegment indexSegment)
+          throws Exception {
+    // Count the number of documents and gather per-column statistics
+    LOGGER.debug("Start building StatsCollector!");
+    buildIndexCreationInfo();
+    LOGGER.info("Finished building StatsCollector!");
+    LOGGER.info("Collected stats for {} documents", _totalDocs);
+
+    try {
+      // Initialize the index creation using the per-column statistics 
information
+      // TODO: _indexCreationInfoMap holds the reference to all unique values 
on heap (ColumnIndexCreationInfo ->
+      //       ColumnStatistics) throughout the segment creation. Find a way 
to release the memory early.
+      _indexCreator.init(_config, _segmentIndexCreationInfo, 
_indexCreationInfoMap, _dataSchema, _tempIndexDir);
+
+      // Build the indexes
+      LOGGER.info("Start building Index by column");
+
+      TreeSet<String> columns = _dataSchema.getPhysicalColumnNames();
+
+      // TODO: Eventually pull the doc Id sorting logic out of Record Reader 
so that all row oriented logic can be
+      //    removed from this code.
+      int[] sortedDocIds = ((PinotSegmentRecordReader) 
_recordReader).getSortedDocIds();
+      for (String col : columns) {
+        _indexCreator.indexColumn(col, sortedDocIds, indexSegment);
+      }
+    } catch (Exception e) {
+      _indexCreator.close(); // TODO: Why is this only closed on an exception?
+      throw e;
+    } finally {
+      _recordReader.close();

Review Comment:
   We should not create record reader when doing column major conversion



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java:
##########
@@ -70,11 +72,27 @@ public RealtimeSegmentConverter(MutableSegmentImpl 
realtimeSegment, SegmentZKPro
     _tableConfig = tableConfig;
     _segmentName = segmentName;
     _nullHandlingEnabled = nullHandlingEnabled;
+
+    // Check if column major mode should be enabled
+    try {
+      // TODO(Erich): move this so that the code does not directly reference 
the flag name

Review Comment:
   Let's move this into TableConfig -> IngestionConfig -> 
StreamIngestionConfig, and add a field `_enableColumnMajorSegmentCreation`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -344,7 +390,7 @@ private void handlePostCreation()
     // Persist creation metadata to disk
     persistCreationMeta(segmentOutputDir, crc, creationTime);
 
-    LOGGER.info("Driver, record read time : {}", _totalRecordReadTime);
+    LOGGER.info("Driver, record read time (NS) : {}", _totalRecordReadTimeNS);

Review Comment:
   Let's keep the log unchanged, but convert the time to ms



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -334,6 +340,74 @@ public void indexRow(GenericRow row)
     }
 
     _docIdCounter++;
+    _durationNS += System.nanoTime() - startNS;
+  }
+
+  @Override
+  public void indexColumn(String columnName, @Nullable int[] sortedDocIds, 
IndexSegment segment)
+          throws IOException {
+    long startNS = System.nanoTime();
+
+    // TODO(ERICH): Get a measure of the ratio of columns to indexes (how many 
indexes per column are there)
+    // Iterate over each value in the column
+    try (PinotSegmentColumnReader colReader = new 
PinotSegmentColumnReader(segment, columnName)) {
+      int numDocs = segment.getSegmentMetadata().getTotalDocs();
+      Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex = 
_creatorsByColAndIndex.get(columnName);
+      NullValueVectorCreator nullVec = 
_nullValueVectorCreatorMap.get(columnName);
+      if (sortedDocIds != null) {
+        for (int docId : sortedDocIds) {
+          // TODO(Erich): should I avoid a function call in the loop like this?
+          indexColumnValue(colReader, creatorsByIndex, columnName, docId, 
nullVec);
+        }
+      } else {
+        for (int docId = 0; docId < numDocs; docId++) {
+          indexColumnValue(colReader, creatorsByIndex, columnName, docId, 
nullVec);
+        }
+      }
+    }
+
+    _docIdCounter++;
+    _durationNS += System.nanoTime() - startNS;
+  }
+
+  private void indexColumnValue(PinotSegmentColumnReader colReader,
+                                Map<IndexType<?, ?, ?>, IndexCreator> 
creatorsByIndex,

Review Comment:
   (code format) Please follow the Pinot Style



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -102,7 +104,7 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
   private int _totalDocs = 0;
   private File _tempIndexDir;
   private String _segmentName;
-  private long _totalRecordReadTime = 0;
+  private long _totalRecordReadTimeNS = 0;

Review Comment:
   Let's keep the naming consistent
   ```suggestion
     private long _totalRecordReadTimeNs = 0;
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -334,6 +340,74 @@ public void indexRow(GenericRow row)
     }
 
     _docIdCounter++;
+    _durationNS += System.nanoTime() - startNS;
+  }
+
+  @Override
+  public void indexColumn(String columnName, @Nullable int[] sortedDocIds, 
IndexSegment segment)
+          throws IOException {
+    long startNS = System.nanoTime();
+
+    // TODO(ERICH): Get a measure of the ratio of columns to indexes (how many 
indexes per column are there)
+    // Iterate over each value in the column
+    try (PinotSegmentColumnReader colReader = new 
PinotSegmentColumnReader(segment, columnName)) {
+      int numDocs = segment.getSegmentMetadata().getTotalDocs();
+      Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex = 
_creatorsByColAndIndex.get(columnName);
+      NullValueVectorCreator nullVec = 
_nullValueVectorCreatorMap.get(columnName);
+      if (sortedDocIds != null) {
+        for (int docId : sortedDocIds) {
+          // TODO(Erich): should I avoid a function call in the loop like this?
+          indexColumnValue(colReader, creatorsByIndex, columnName, docId, 
nullVec);

Review Comment:
   Function call in the loop is fine. Let's extract the per column logic (e.g. 
map lookups) outside of this method to reduce the cost



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -273,6 +280,45 @@ public void build()
     handlePostCreation();
   }
 
+  public void buildByColumn(IndexSegment indexSegment)
+          throws Exception {
+    // Count the number of documents and gather per-column statistics
+    LOGGER.debug("Start building StatsCollector!");
+    buildIndexCreationInfo();
+    LOGGER.info("Finished building StatsCollector!");
+    LOGGER.info("Collected stats for {} documents", _totalDocs);
+
+    try {
+      // Initialize the index creation using the per-column statistics 
information
+      // TODO: _indexCreationInfoMap holds the reference to all unique values 
on heap (ColumnIndexCreationInfo ->
+      //       ColumnStatistics) throughout the segment creation. Find a way 
to release the memory early.
+      _indexCreator.init(_config, _segmentIndexCreationInfo, 
_indexCreationInfoMap, _dataSchema, _tempIndexDir);
+
+      // Build the indexes
+      LOGGER.info("Start building Index by column");
+
+      TreeSet<String> columns = _dataSchema.getPhysicalColumnNames();
+
+      // TODO: Eventually pull the doc Id sorting logic out of Record Reader 
so that all row oriented logic can be
+      //    removed from this code.
+      int[] sortedDocIds = ((PinotSegmentRecordReader) 
_recordReader).getSortedDocIds();
+      for (String col : columns) {
+        _indexCreator.indexColumn(col, sortedDocIds, indexSegment);
+      }
+    } catch (Exception e) {
+      _indexCreator.close(); // TODO: Why is this only closed on an exception?

Review Comment:
   In regular case, it will be closed after `handlePostCreation()`



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentCreator.java:
##########
@@ -55,6 +57,16 @@ void init(SegmentGeneratorConfig segmentCreationSpec, 
SegmentIndexCreationInfo s
   void indexRow(GenericRow row)
       throws IOException;
 
+  /**
+   * Adds a column to the index.
+   *
+   * @param columnName - The name of the column being added to.
+   * @param sortedDocIds - If not null, then this provides the sorted order of 
documents.
+   * @param colReader - Used to get the values of the column.
+   */
+  void indexColumn(String columnName, @Nullable int[] sortedDocIds, 
IndexSegment colReader)

Review Comment:
   The third argument is not really a column reader



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to