Jackie-Jiang commented on code in PR #11776:
URL: https://github.com/apache/pinot/pull/11776#discussion_r1367573227


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -336,6 +338,60 @@ public void indexRow(GenericRow row)
     _docIdCounter++;
   }
 
+  @Override
+  public void indexColumn(String columnName, @Nullable int[] sortedDocIds, 
IndexSegment segment,
+      boolean skipDefaultNullValues)
+      throws IOException {
+    // Iterate over each value in the column
+    int numDocs = segment.getSegmentMetadata().getTotalDocs();
+    if (numDocs == 0) {
+      return;
+    }
+
+    try (PinotSegmentColumnReader colReader = new 
PinotSegmentColumnReader(segment, columnName)) {
+      Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex = 
_creatorsByColAndIndex.get(columnName);
+      NullValueVectorCreator nullVec = 
_nullValueVectorCreatorMap.get(columnName);
+      FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+      SegmentDictionaryCreator dictionaryCreator = 
_dictionaryCreatorMap.get(columnName);
+      if (sortedDocIds != null) {
+        int onDiskDocId = 0;
+        for (int docId : sortedDocIds) {
+          indexColumnValue(colReader, creatorsByIndex, columnName, fieldSpec, 
dictionaryCreator, docId, onDiskDocId,
+              nullVec, skipDefaultNullValues);
+          onDiskDocId += 1;

Review Comment:
   (nit)
   ```suggestion
             onDiskDocId++;
   ```



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java:
##########
@@ -34,6 +34,9 @@ public class StreamIngestionConfig extends BaseJsonConfig {
   @JsonPropertyDescription("All configs for the streams from which to ingest")
   private final List<Map<String, String>> _streamConfigMaps;
 
+  @JsonPropertyDescription("Whether to use column major mode when creating the 
segment.")
+  private boolean _columnMajorSegmentBuilderEnabled;

Review Comment:
   Since this is a newly added temporary flag, don't see much value supporting 
it in 2 different places. Let's just remove it from here and only keep the one 
in `IndexingConfig` for simplicity



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java:
##########
@@ -194,6 +194,10 @@ public int[] getSortedDocIds() {
     return _sortedDocIds;
   }
 
+  public boolean getSkipDefaultNullValues() {

Review Comment:
   Hmm, seems it is not changed. Was there commit not pushed?



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java:
##########
@@ -117,6 +113,7 @@ public enum TimeColumnType {
   // Use on-heap or off-heap memory to generate index (currently only affect 
inverted index and star-tree v2)
   private boolean _onHeap = false;
   private boolean _nullHandlingEnabled = false;
+  private boolean _columnMajorSegmentBuilderEnabled = false;

Review Comment:
   This is not used and not needed.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -229,16 +232,21 @@ public void build()
       GenericRow reuse = new GenericRow();
       TransformPipeline.Result reusedResult = new TransformPipeline.Result();
       while (_recordReader.hasNext()) {
-        long recordReadStartTime = System.currentTimeMillis();
-        long recordReadStopTime = System.currentTimeMillis();
+        long recordReadStopTime = System.nanoTime();

Review Comment:
   My IDE will show the redundant statement, not sure if you need to enable it 
explicitly
   ```suggestion
           long recordReadStopTime;
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -336,6 +338,60 @@ public void indexRow(GenericRow row)
     _docIdCounter++;
   }
 
+  @Override
+  public void indexColumn(String columnName, @Nullable int[] sortedDocIds, 
IndexSegment segment,
+      boolean skipDefaultNullValues)
+      throws IOException {
+    // Iterate over each value in the column
+    int numDocs = segment.getSegmentMetadata().getTotalDocs();
+    if (numDocs == 0) {
+      return;
+    }
+
+    try (PinotSegmentColumnReader colReader = new 
PinotSegmentColumnReader(segment, columnName)) {
+      Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex = 
_creatorsByColAndIndex.get(columnName);
+      NullValueVectorCreator nullVec = 
_nullValueVectorCreatorMap.get(columnName);
+      FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+      SegmentDictionaryCreator dictionaryCreator = 
_dictionaryCreatorMap.get(columnName);
+      if (sortedDocIds != null) {
+        int onDiskDocId = 0;
+        for (int docId : sortedDocIds) {
+          indexColumnValue(colReader, creatorsByIndex, columnName, fieldSpec, 
dictionaryCreator, docId, onDiskDocId,
+              nullVec, skipDefaultNullValues);
+          onDiskDocId += 1;
+        }
+      } else {
+        for (int docId = 0; docId < numDocs; docId++) {
+          indexColumnValue(colReader, creatorsByIndex, columnName, fieldSpec, 
dictionaryCreator, docId, docId, nullVec,
+              skipDefaultNullValues);
+        }
+      }
+    }
+  }
+
+  private void indexColumnValue(PinotSegmentColumnReader colReader,
+      Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex, String 
columnName, FieldSpec fieldSpec,
+      SegmentDictionaryCreator dictionaryCreator, int sourceDocId, int 
onDiskDocPos, NullValueVectorCreator nullVec,
+      boolean skipDefaultNullValues)
+      throws IOException {
+    Object columnValueToIndex = colReader.getValue(sourceDocId);
+    if (columnValueToIndex == null) {
+      throw new RuntimeException("Null value for column:" + columnName);
+    }
+
+    if (fieldSpec.isSingleValueField()) {
+      indexSingleValueRow(dictionaryCreator, columnValueToIndex, 
creatorsByIndex);
+    } else {
+      indexMultiValueRow(dictionaryCreator, (Object[]) columnValueToIndex, 
creatorsByIndex);
+    }
+
+    if (_nullHandlingEnabled && !skipDefaultNullValues) {

Review Comment:
   Remove the second check
   ```suggestion
       if (_nullHandlingEnabled) {
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -344,12 +394,13 @@ private void handlePostCreation()
     // Persist creation metadata to disk
     persistCreationMeta(segmentOutputDir, crc, creationTime);
 
-    LOGGER.info("Driver, record read time : {}", _totalRecordReadTime);
+    LOGGER.info("Driver, record read time : {}", ((float) 
_totalRecordReadTimeNs) / 1000000.0);

Review Comment:
   We don't want to log float time
   ```suggestion
       LOGGER.info("Driver, record read time : {}", 
TimeUnit.NANOSECONDS.toMillis(_totalRecordReadTimeNs));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to