Jackie-Jiang commented on code in PR #11776: URL: https://github.com/apache/pinot/pull/11776#discussion_r1367573227
########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java: ########## @@ -336,6 +338,60 @@ public void indexRow(GenericRow row) _docIdCounter++; } + @Override + public void indexColumn(String columnName, @Nullable int[] sortedDocIds, IndexSegment segment, + boolean skipDefaultNullValues) + throws IOException { + // Iterate over each value in the column + int numDocs = segment.getSegmentMetadata().getTotalDocs(); + if (numDocs == 0) { + return; + } + + try (PinotSegmentColumnReader colReader = new PinotSegmentColumnReader(segment, columnName)) { + Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex = _creatorsByColAndIndex.get(columnName); + NullValueVectorCreator nullVec = _nullValueVectorCreatorMap.get(columnName); + FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName); + SegmentDictionaryCreator dictionaryCreator = _dictionaryCreatorMap.get(columnName); + if (sortedDocIds != null) { + int onDiskDocId = 0; + for (int docId : sortedDocIds) { + indexColumnValue(colReader, creatorsByIndex, columnName, fieldSpec, dictionaryCreator, docId, onDiskDocId, + nullVec, skipDefaultNullValues); + onDiskDocId += 1; Review Comment: (nit) ```suggestion onDiskDocId++; ``` ########## pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java: ########## @@ -34,6 +34,9 @@ public class StreamIngestionConfig extends BaseJsonConfig { @JsonPropertyDescription("All configs for the streams from which to ingest") private final List<Map<String, String>> _streamConfigMaps; + @JsonPropertyDescription("Whether to use column major mode when creating the segment.") + private boolean _columnMajorSegmentBuilderEnabled; Review Comment: Since this is a newly added temporary flag, don't see much value supporting it in 2 different places. Let's just remove it from here and only keep the one in `IndexingConfig` for simplicity ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java: ########## @@ -194,6 +194,10 @@ public int[] getSortedDocIds() { return _sortedDocIds; } + public boolean getSkipDefaultNullValues() { Review Comment: Hmm, seems it is not changed. Was there commit not pushed? ########## pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java: ########## @@ -117,6 +113,7 @@ public enum TimeColumnType { // Use on-heap or off-heap memory to generate index (currently only affect inverted index and star-tree v2) private boolean _onHeap = false; private boolean _nullHandlingEnabled = false; + private boolean _columnMajorSegmentBuilderEnabled = false; Review Comment: This is not used and not needed. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java: ########## @@ -229,16 +232,21 @@ public void build() GenericRow reuse = new GenericRow(); TransformPipeline.Result reusedResult = new TransformPipeline.Result(); while (_recordReader.hasNext()) { - long recordReadStartTime = System.currentTimeMillis(); - long recordReadStopTime = System.currentTimeMillis(); + long recordReadStopTime = System.nanoTime(); Review Comment: My IDE will show the redundant statement, not sure if you need to enable it explicitly ```suggestion long recordReadStopTime; ``` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java: ########## @@ -336,6 +338,60 @@ public void indexRow(GenericRow row) _docIdCounter++; } + @Override + public void indexColumn(String columnName, @Nullable int[] sortedDocIds, IndexSegment segment, + boolean skipDefaultNullValues) + throws IOException { + // Iterate over each value in the column + int numDocs = segment.getSegmentMetadata().getTotalDocs(); + if (numDocs == 0) { + return; + } + + try (PinotSegmentColumnReader colReader = new PinotSegmentColumnReader(segment, columnName)) { + Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex = _creatorsByColAndIndex.get(columnName); + NullValueVectorCreator nullVec = _nullValueVectorCreatorMap.get(columnName); + FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName); + SegmentDictionaryCreator dictionaryCreator = _dictionaryCreatorMap.get(columnName); + if (sortedDocIds != null) { + int onDiskDocId = 0; + for (int docId : sortedDocIds) { + indexColumnValue(colReader, creatorsByIndex, columnName, fieldSpec, dictionaryCreator, docId, onDiskDocId, + nullVec, skipDefaultNullValues); + onDiskDocId += 1; + } + } else { + for (int docId = 0; docId < numDocs; docId++) { + indexColumnValue(colReader, creatorsByIndex, columnName, fieldSpec, dictionaryCreator, docId, docId, nullVec, + skipDefaultNullValues); + } + } + } + } + + private void indexColumnValue(PinotSegmentColumnReader colReader, + Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex, String columnName, FieldSpec fieldSpec, + SegmentDictionaryCreator dictionaryCreator, int sourceDocId, int onDiskDocPos, NullValueVectorCreator nullVec, + boolean skipDefaultNullValues) + throws IOException { + Object columnValueToIndex = colReader.getValue(sourceDocId); + if (columnValueToIndex == null) { + throw new RuntimeException("Null value for column:" + columnName); + } + + if (fieldSpec.isSingleValueField()) { + indexSingleValueRow(dictionaryCreator, columnValueToIndex, creatorsByIndex); + } else { + indexMultiValueRow(dictionaryCreator, (Object[]) columnValueToIndex, creatorsByIndex); + } + + if (_nullHandlingEnabled && !skipDefaultNullValues) { Review Comment: Remove the second check ```suggestion if (_nullHandlingEnabled) { ``` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java: ########## @@ -344,12 +394,13 @@ private void handlePostCreation() // Persist creation metadata to disk persistCreationMeta(segmentOutputDir, crc, creationTime); - LOGGER.info("Driver, record read time : {}", _totalRecordReadTime); + LOGGER.info("Driver, record read time : {}", ((float) _totalRecordReadTimeNs) / 1000000.0); Review Comment: We don't want to log float time ```suggestion LOGGER.info("Driver, record read time : {}", TimeUnit.NANOSECONDS.toMillis(_totalRecordReadTimeNs)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org