siddharthteotia commented on code in PR #9810:
URL: https://github.com/apache/pinot/pull/9810#discussion_r1038882038


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/InvertedIndexAndDictionaryBasedForwardIndexCreator.java:
##########
@@ -0,0 +1,619 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.loader;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import 
org.apache.pinot.segment.local.segment.index.readers.BitmapInvertedIndexReader;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.segment.spi.store.ColumnIndexType;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.DICTIONARY_ELEMENT_SIZE;
+import static 
org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.HAS_DICTIONARY;
+import static 
org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.MAX_MULTI_VALUE_ELEMENTS;
+import static 
org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.TOTAL_NUMBER_OF_ENTRIES;
+import static 
org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.getKeyFor;
+
+
+/**
+ * Helper classed used by the {@link SegmentPreProcessor} to generate the 
forward index from inverted index and
+ * dictionary when the forward index is enabled for columns where it was 
previously disabled. This is also invoked by
+ * the {@link IndexHandler} code in scenarios where the forward index needs to 
be temporarily created to generate other
+ * indexes for the given column. In such cases the forward index will be 
cleaned up after the {@link IndexHandler} code
+ * completes.
+ *
+ * For multi-value columns the following invariants cannot be maintained:
+ * - Ordering of elements within a given multi-value row. This will always be 
a limitation.
+ * - Data loss is possible if there repeats for elements within a given 
multi-value row. This limitation will be
+ *   addressed as a future change
+ *
+ * TODO: Currently for multi-value columns generating the forward index can 
lead to a data loss as frequency information
+ *       is not available for repeats within a given row. This needs to be 
addressed by tracking the frequency data
+ *       as part of an on-disk structure when forward index is disabled for a 
column.
+ */
+public class InvertedIndexAndDictionaryBasedForwardIndexCreator implements 
AutoCloseable {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InvertedIndexAndDictionaryBasedForwardIndexCreator.class);
+
+  // Use MMapBuffer if the value buffer size is larger than 2G
+  private static final int NUM_VALUES_THRESHOLD_FOR_MMAP_BUFFER = 500_000_000;
+
+  private static final String FORWARD_INDEX_VALUE_BUFFER_SUFFIX = 
".fwd.idx.val.buf";
+  private static final String FORWARD_INDEX_LENGTH_BUFFER_SUFFIX = 
".fwd.idx.len.buf";
+  private static final String FORWARD_INDEX_MAX_SIZE_BUFFER_SUFFIX = 
".fwd.idx.maxsize.buf";
+
+  private final String _columnName;
+  private final SegmentMetadata _segmentMetadata;
+  private final IndexLoadingConfig _indexLoadingConfig;
+  private final SegmentDirectory.Writer _segmentWriter;
+  private final IndexCreatorProvider _indexCreatorProvider;
+  private final boolean _isTemporaryForwardIndex;
+
+  // Metadata
+  private final ColumnMetadata _columnMetadata;
+  private final boolean _singleValue;
+  private final int _cardinality;
+  private final int _numDocs;
+  private final int _maxNumberOfMultiValues;
+  private final int _totalNumberOfEntries;
+  private final boolean _dictionaryEnabled;
+  private final ChunkCompressionType _chunkCompressionType;
+  private final boolean _useMMapBuffer;
+
+  // Files and temporary buffers
+  private final File _forwardIndexFile;
+  private final File _forwardIndexValueBufferFile;
+  private final File _forwardIndexLengthBufferFile;
+  private final File _forwardIndexMaxSizeBufferFile;
+
+  // Forward index buffers (to store the dictId at the correct docId)
+  private PinotDataBuffer _forwardIndexValueBuffer;
+  // For multi-valued column only because each docId can have multiple dictIds
+  private PinotDataBuffer _forwardIndexLengthBuffer;
+  private int _nextValueId;
+  // For multi-valued column only to track max row size
+  private PinotDataBuffer _forwardIndexMaxSizeBuffer;
+
+  public InvertedIndexAndDictionaryBasedForwardIndexCreator(String columnName, 
SegmentMetadata segmentMetadata,
+      IndexLoadingConfig indexLoadingConfig, SegmentDirectory.Writer 
segmentWriter,
+      IndexCreatorProvider indexCreatorProvider, boolean 
isTemporaryForwardIndex)
+      throws IOException {
+    _columnName = columnName;
+    _segmentMetadata = segmentMetadata;
+    _indexLoadingConfig = indexLoadingConfig;
+    _segmentWriter = segmentWriter;
+    _indexCreatorProvider = indexCreatorProvider;
+    _isTemporaryForwardIndex = isTemporaryForwardIndex;
+
+    _columnMetadata = segmentMetadata.getColumnMetadataFor(columnName);
+    _singleValue = _columnMetadata.isSingleValue();
+    _cardinality = _columnMetadata.getCardinality();
+    _numDocs = _columnMetadata.getTotalDocs();
+    _totalNumberOfEntries = _columnMetadata.getTotalNumberOfEntries();
+    _maxNumberOfMultiValues = _columnMetadata.getMaxNumberOfMultiValues();
+    _dictionaryEnabled = 
!_indexLoadingConfig.getNoDictionaryColumns().contains(columnName);
+    _chunkCompressionType = getColumnCompressionType();
+    int numValues = _singleValue ? _numDocs : _totalNumberOfEntries;
+    _useMMapBuffer = numValues > NUM_VALUES_THRESHOLD_FOR_MMAP_BUFFER;
+
+    // Sorted columns should never need recreation of the forward index as the 
forwardIndexDisabled flag is treated as
+    // a no-op for sorted columns
+    File indexDir = segmentMetadata.getIndexDir();
+    String fileExtension;
+    if (_dictionaryEnabled) {
+      fileExtension = _singleValue ? 
V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION
+          : V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION;
+    } else {
+      fileExtension = _singleValue ? 
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION
+          : V1Constants.Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION;
+    }
+    _forwardIndexFile = new File(indexDir, columnName + fileExtension);
+    _forwardIndexValueBufferFile = new File(indexDir, columnName + 
FORWARD_INDEX_VALUE_BUFFER_SUFFIX);
+    _forwardIndexLengthBufferFile = new File(indexDir, columnName + 
FORWARD_INDEX_LENGTH_BUFFER_SUFFIX);
+    _forwardIndexMaxSizeBufferFile = new File(indexDir, columnName + 
FORWARD_INDEX_MAX_SIZE_BUFFER_SUFFIX);
+
+    // Create the temporary buffers needed
+    try {
+      _forwardIndexValueBuffer = createTempBuffer((long) numValues * 
Integer.BYTES, _forwardIndexValueBufferFile);
+      if (!_singleValue) {
+        _forwardIndexLengthBuffer = createTempBuffer((long) _numDocs * 
Integer.BYTES, _forwardIndexLengthBufferFile);
+        for (int i = 0; i < _numDocs; i++) {
+          // We need to clear the forward index length buffer because we rely 
on the initial value of 0, and keep
+          // updating the value instead of directly setting the value
+          _forwardIndexLengthBuffer.putInt((long) i * Integer.BYTES, 0);
+        }
+        _forwardIndexMaxSizeBuffer = createTempBuffer((long) _numDocs * 
Integer.BYTES, _forwardIndexMaxSizeBufferFile);
+        for (int i = 0; i < _numDocs; i++) {
+          // We need to clear the forward index max size buffer because we 
rely on the initial value of 0, and keep
+          // updating the value instead of directly setting the value
+          _forwardIndexMaxSizeBuffer.putInt((long) i * Integer.BYTES, 0);
+        }
+      }
+    } catch (Exception e) {
+      destroyBuffer(_forwardIndexValueBuffer, _forwardIndexValueBufferFile);
+      destroyBuffer(_forwardIndexLengthBuffer, _forwardIndexLengthBufferFile);
+      destroyBuffer(_forwardIndexMaxSizeBuffer, 
_forwardIndexMaxSizeBufferFile);
+      throw new IOException("Couldn't create temp buffers to construct forward 
index", e);
+    }
+  }
+
+  private ChunkCompressionType getColumnCompressionType() {
+    if (!_indexLoadingConfig.getNoDictionaryColumns().contains(_columnName)) {
+      return null;
+    }
+
+    ChunkCompressionType compressionType = null;
+    if (_indexLoadingConfig.getCompressionConfigs().containsKey(_columnName)) {
+      compressionType = 
_indexLoadingConfig.getCompressionConfigs().get(_columnName);
+    } else if 
(_indexLoadingConfig.getNoDictionaryConfig().containsKey(_columnName)) {
+      compressionType = 
ChunkCompressionType.valueOf(_indexLoadingConfig.getNoDictionaryConfig().get(_columnName));
+    }
+
+    // This logic to choose the default compressionType is duplicated from
+    // SegmentColumnarIndexCreator::getColumnCompressionType
+    if (compressionType == null) {
+      if (_columnMetadata.getFieldSpec().getFieldType() == 
FieldSpec.FieldType.METRIC) {
+        compressionType = ChunkCompressionType.PASS_THROUGH;
+      } else {
+        compressionType = ChunkCompressionType.LZ4;
+      }
+    }

Review Comment:
   IIRC, Vivek also wrote a similar helper method in one of his recent PRs. Is 
it possible to make this public static inside `SegmentColumnarIndexCreator` to 
keep logic in one place ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to