Jackie-Jiang commented on code in PR #11776: URL: https://github.com/apache/pinot/pull/11776#discussion_r1355825121
########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java: ########## @@ -154,4 +179,12 @@ public static Schema getUpdatedSchema(Schema original) { } return newSchema; } + + public boolean isColumnMajorEnabled() { + return _enableColumnMajor; + } + + public int getTotalDocCount() { + return _totalDocs; + } Review Comment: Seems not used. Suggest removing them and change these 2 variables to local variable ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java: ########## @@ -104,6 +106,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { private int _totalDocs; private int _docIdCounter; private boolean _nullHandlingEnabled; + private long _durationNS = 0; Review Comment: Is this used? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java: ########## @@ -334,6 +340,74 @@ public void indexRow(GenericRow row) } _docIdCounter++; + _durationNS += System.nanoTime() - startNS; + } + + @Override + public void indexColumn(String columnName, @Nullable int[] sortedDocIds, IndexSegment segment) + throws IOException { + long startNS = System.nanoTime(); + + // TODO(ERICH): Get a measure of the ratio of columns to indexes (how many indexes per column are there) + // Iterate over each value in the column + try (PinotSegmentColumnReader colReader = new PinotSegmentColumnReader(segment, columnName)) { + int numDocs = segment.getSegmentMetadata().getTotalDocs(); + Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex = _creatorsByColAndIndex.get(columnName); + NullValueVectorCreator nullVec = _nullValueVectorCreatorMap.get(columnName); + if (sortedDocIds != null) { + for (int docId : sortedDocIds) { + // TODO(Erich): should I avoid a function call in the loop like this? + indexColumnValue(colReader, creatorsByIndex, columnName, docId, nullVec); + } + } else { + for (int docId = 0; docId < numDocs; docId++) { + indexColumnValue(colReader, creatorsByIndex, columnName, docId, nullVec); + } + } + } + + _docIdCounter++; + _durationNS += System.nanoTime() - startNS; + } + + private void indexColumnValue(PinotSegmentColumnReader colReader, + Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex, + String columnName, + int docId, + NullValueVectorCreator nullVec) + throws IOException { + Object columnValueToIndex = colReader.getValue(docId); + if (columnValueToIndex == null) { + throw new RuntimeException("Null value for column:" + columnName); + } + + // TODO(ERICH): pull this out of the loop because it only needs to be looked up once per column + // TODO(ERICH): do a performance comparison for before and after pulling this out + FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName); + SegmentDictionaryCreator dictionaryCreator = _dictionaryCreatorMap.get(columnName); + + if (fieldSpec.isSingleValueField()) { + indexSingleValueRow(dictionaryCreator, columnValueToIndex, creatorsByIndex); + } else { + indexMultiValueRow(dictionaryCreator, (Object[]) columnValueToIndex, creatorsByIndex); + } + + if (_nullHandlingEnabled) { + //handling null values +// In row oriented: +// - this.indexRow iterates over each column and checks if it isNullValue. If it is then it sets the null +// value vector for that doc id +// - This null value comes from the GenericRow that is created by PinotSegmentRecordReader +// - PinotSegmentRecordReader:L224 is where we figure out the null value stuff +// - PSegRecReader calls PinotSegmentColumnReader.isNull on the doc id to determine if the value for that +// column of that docId is null +// - if it returns true and we are NOT skipping null values we put the default null value into that field +// of the GenericRow + // TODO(Erich): do we need to check the Skip Null Values flag here? Yes, it's done in PinotRecordReader + if (colReader.isNull(docId)) { Review Comment: This is correct ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java: ########## @@ -220,15 +222,20 @@ public String getSegmentName() { } public void getRecord(int docId, GenericRow buffer) { + // TODO: start duration Review Comment: Is the change in this class temporary debugging code? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java: ########## @@ -273,6 +280,45 @@ public void build() handlePostCreation(); } + public void buildByColumn(IndexSegment indexSegment) + throws Exception { + // Count the number of documents and gather per-column statistics + LOGGER.debug("Start building StatsCollector!"); + buildIndexCreationInfo(); + LOGGER.info("Finished building StatsCollector!"); + LOGGER.info("Collected stats for {} documents", _totalDocs); + + try { + // Initialize the index creation using the per-column statistics information + // TODO: _indexCreationInfoMap holds the reference to all unique values on heap (ColumnIndexCreationInfo -> + // ColumnStatistics) throughout the segment creation. Find a way to release the memory early. + _indexCreator.init(_config, _segmentIndexCreationInfo, _indexCreationInfoMap, _dataSchema, _tempIndexDir); + + // Build the indexes + LOGGER.info("Start building Index by column"); + + TreeSet<String> columns = _dataSchema.getPhysicalColumnNames(); + + // TODO: Eventually pull the doc Id sorting logic out of Record Reader so that all row oriented logic can be + // removed from this code. + int[] sortedDocIds = ((PinotSegmentRecordReader) _recordReader).getSortedDocIds(); + for (String col : columns) { + _indexCreator.indexColumn(col, sortedDocIds, indexSegment); + } + } catch (Exception e) { + _indexCreator.close(); // TODO: Why is this only closed on an exception? + throw e; + } finally { + _recordReader.close(); Review Comment: We should not create record reader when doing column major conversion ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java: ########## @@ -70,11 +72,27 @@ public RealtimeSegmentConverter(MutableSegmentImpl realtimeSegment, SegmentZKPro _tableConfig = tableConfig; _segmentName = segmentName; _nullHandlingEnabled = nullHandlingEnabled; + + // Check if column major mode should be enabled + try { + // TODO(Erich): move this so that the code does not directly reference the flag name Review Comment: Let's move this into TableConfig -> IngestionConfig -> StreamIngestionConfig, and add a field `_enableColumnMajorSegmentCreation` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java: ########## @@ -344,7 +390,7 @@ private void handlePostCreation() // Persist creation metadata to disk persistCreationMeta(segmentOutputDir, crc, creationTime); - LOGGER.info("Driver, record read time : {}", _totalRecordReadTime); + LOGGER.info("Driver, record read time (NS) : {}", _totalRecordReadTimeNS); Review Comment: Let's keep the log unchanged, but convert the time to ms ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java: ########## @@ -334,6 +340,74 @@ public void indexRow(GenericRow row) } _docIdCounter++; + _durationNS += System.nanoTime() - startNS; + } + + @Override + public void indexColumn(String columnName, @Nullable int[] sortedDocIds, IndexSegment segment) + throws IOException { + long startNS = System.nanoTime(); + + // TODO(ERICH): Get a measure of the ratio of columns to indexes (how many indexes per column are there) + // Iterate over each value in the column + try (PinotSegmentColumnReader colReader = new PinotSegmentColumnReader(segment, columnName)) { + int numDocs = segment.getSegmentMetadata().getTotalDocs(); + Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex = _creatorsByColAndIndex.get(columnName); + NullValueVectorCreator nullVec = _nullValueVectorCreatorMap.get(columnName); + if (sortedDocIds != null) { + for (int docId : sortedDocIds) { + // TODO(Erich): should I avoid a function call in the loop like this? + indexColumnValue(colReader, creatorsByIndex, columnName, docId, nullVec); + } + } else { + for (int docId = 0; docId < numDocs; docId++) { + indexColumnValue(colReader, creatorsByIndex, columnName, docId, nullVec); + } + } + } + + _docIdCounter++; + _durationNS += System.nanoTime() - startNS; + } + + private void indexColumnValue(PinotSegmentColumnReader colReader, + Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex, Review Comment: (code format) Please follow the Pinot Style ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java: ########## @@ -102,7 +104,7 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive private int _totalDocs = 0; private File _tempIndexDir; private String _segmentName; - private long _totalRecordReadTime = 0; + private long _totalRecordReadTimeNS = 0; Review Comment: Let's keep the naming consistent ```suggestion private long _totalRecordReadTimeNs = 0; ``` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java: ########## @@ -334,6 +340,74 @@ public void indexRow(GenericRow row) } _docIdCounter++; + _durationNS += System.nanoTime() - startNS; + } + + @Override + public void indexColumn(String columnName, @Nullable int[] sortedDocIds, IndexSegment segment) + throws IOException { + long startNS = System.nanoTime(); + + // TODO(ERICH): Get a measure of the ratio of columns to indexes (how many indexes per column are there) + // Iterate over each value in the column + try (PinotSegmentColumnReader colReader = new PinotSegmentColumnReader(segment, columnName)) { + int numDocs = segment.getSegmentMetadata().getTotalDocs(); + Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex = _creatorsByColAndIndex.get(columnName); + NullValueVectorCreator nullVec = _nullValueVectorCreatorMap.get(columnName); + if (sortedDocIds != null) { + for (int docId : sortedDocIds) { + // TODO(Erich): should I avoid a function call in the loop like this? + indexColumnValue(colReader, creatorsByIndex, columnName, docId, nullVec); Review Comment: Function call in the loop is fine. Let's extract the per column logic (e.g. map lookups) outside of this method to reduce the cost ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java: ########## @@ -273,6 +280,45 @@ public void build() handlePostCreation(); } + public void buildByColumn(IndexSegment indexSegment) + throws Exception { + // Count the number of documents and gather per-column statistics + LOGGER.debug("Start building StatsCollector!"); + buildIndexCreationInfo(); + LOGGER.info("Finished building StatsCollector!"); + LOGGER.info("Collected stats for {} documents", _totalDocs); + + try { + // Initialize the index creation using the per-column statistics information + // TODO: _indexCreationInfoMap holds the reference to all unique values on heap (ColumnIndexCreationInfo -> + // ColumnStatistics) throughout the segment creation. Find a way to release the memory early. + _indexCreator.init(_config, _segmentIndexCreationInfo, _indexCreationInfoMap, _dataSchema, _tempIndexDir); + + // Build the indexes + LOGGER.info("Start building Index by column"); + + TreeSet<String> columns = _dataSchema.getPhysicalColumnNames(); + + // TODO: Eventually pull the doc Id sorting logic out of Record Reader so that all row oriented logic can be + // removed from this code. + int[] sortedDocIds = ((PinotSegmentRecordReader) _recordReader).getSortedDocIds(); + for (String col : columns) { + _indexCreator.indexColumn(col, sortedDocIds, indexSegment); + } + } catch (Exception e) { + _indexCreator.close(); // TODO: Why is this only closed on an exception? Review Comment: In regular case, it will be closed after `handlePostCreation()` ########## pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentCreator.java: ########## @@ -55,6 +57,16 @@ void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreationInfo s void indexRow(GenericRow row) throws IOException; + /** + * Adds a column to the index. + * + * @param columnName - The name of the column being added to. + * @param sortedDocIds - If not null, then this provides the sorted order of documents. + * @param colReader - Used to get the values of the column. + */ + void indexColumn(String columnName, @Nullable int[] sortedDocIds, IndexSegment colReader) Review Comment: The third argument is not really a column reader -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org