siddharthteotia commented on code in PR #9810:
URL: https://github.com/apache/pinot/pull/9810#discussion_r1038883189


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/InvertedIndexAndDictionaryBasedForwardIndexCreator.java:
##########
@@ -0,0 +1,619 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.loader;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import 
org.apache.pinot.segment.local.segment.index.readers.BitmapInvertedIndexReader;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.segment.spi.store.ColumnIndexType;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.DICTIONARY_ELEMENT_SIZE;
+import static 
org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.HAS_DICTIONARY;
+import static 
org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.MAX_MULTI_VALUE_ELEMENTS;
+import static 
org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.TOTAL_NUMBER_OF_ENTRIES;
+import static 
org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.getKeyFor;
+
+
+/**
+ * Helper classed used by the {@link SegmentPreProcessor} to generate the 
forward index from inverted index and
+ * dictionary when the forward index is enabled for columns where it was 
previously disabled. This is also invoked by
+ * the {@link IndexHandler} code in scenarios where the forward index needs to 
be temporarily created to generate other
+ * indexes for the given column. In such cases the forward index will be 
cleaned up after the {@link IndexHandler} code
+ * completes.
+ *
+ * For multi-value columns the following invariants cannot be maintained:
+ * - Ordering of elements within a given multi-value row. This will always be 
a limitation.
+ * - Data loss is possible if there repeats for elements within a given 
multi-value row. This limitation will be
+ *   addressed as a future change
+ *
+ * TODO: Currently for multi-value columns generating the forward index can 
lead to a data loss as frequency information
+ *       is not available for repeats within a given row. This needs to be 
addressed by tracking the frequency data
+ *       as part of an on-disk structure when forward index is disabled for a 
column.
+ */
+public class InvertedIndexAndDictionaryBasedForwardIndexCreator implements 
AutoCloseable {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InvertedIndexAndDictionaryBasedForwardIndexCreator.class);
+
+  // Use MMapBuffer if the value buffer size is larger than 2G
+  private static final int NUM_VALUES_THRESHOLD_FOR_MMAP_BUFFER = 500_000_000;
+
+  private static final String FORWARD_INDEX_VALUE_BUFFER_SUFFIX = 
".fwd.idx.val.buf";
+  private static final String FORWARD_INDEX_LENGTH_BUFFER_SUFFIX = 
".fwd.idx.len.buf";
+  private static final String FORWARD_INDEX_MAX_SIZE_BUFFER_SUFFIX = 
".fwd.idx.maxsize.buf";
+
+  private final String _columnName;
+  private final SegmentMetadata _segmentMetadata;
+  private final IndexLoadingConfig _indexLoadingConfig;
+  private final SegmentDirectory.Writer _segmentWriter;
+  private final IndexCreatorProvider _indexCreatorProvider;
+  private final boolean _isTemporaryForwardIndex;
+
+  // Metadata
+  private final ColumnMetadata _columnMetadata;
+  private final boolean _singleValue;
+  private final int _cardinality;
+  private final int _numDocs;
+  private final int _maxNumberOfMultiValues;
+  private final int _totalNumberOfEntries;
+  private final boolean _dictionaryEnabled;
+  private final ChunkCompressionType _chunkCompressionType;
+  private final boolean _useMMapBuffer;
+
+  // Files and temporary buffers
+  private final File _forwardIndexFile;
+  private final File _forwardIndexValueBufferFile;
+  private final File _forwardIndexLengthBufferFile;
+  private final File _forwardIndexMaxSizeBufferFile;
+
+  // Forward index buffers (to store the dictId at the correct docId)
+  private PinotDataBuffer _forwardIndexValueBuffer;
+  // For multi-valued column only because each docId can have multiple dictIds
+  private PinotDataBuffer _forwardIndexLengthBuffer;
+  private int _nextValueId;
+  // For multi-valued column only to track max row size
+  private PinotDataBuffer _forwardIndexMaxSizeBuffer;
+
+  public InvertedIndexAndDictionaryBasedForwardIndexCreator(String columnName, 
SegmentMetadata segmentMetadata,
+      IndexLoadingConfig indexLoadingConfig, SegmentDirectory.Writer 
segmentWriter,
+      IndexCreatorProvider indexCreatorProvider, boolean 
isTemporaryForwardIndex)
+      throws IOException {
+    _columnName = columnName;
+    _segmentMetadata = segmentMetadata;
+    _indexLoadingConfig = indexLoadingConfig;
+    _segmentWriter = segmentWriter;
+    _indexCreatorProvider = indexCreatorProvider;
+    _isTemporaryForwardIndex = isTemporaryForwardIndex;
+
+    _columnMetadata = segmentMetadata.getColumnMetadataFor(columnName);
+    _singleValue = _columnMetadata.isSingleValue();
+    _cardinality = _columnMetadata.getCardinality();
+    _numDocs = _columnMetadata.getTotalDocs();
+    _totalNumberOfEntries = _columnMetadata.getTotalNumberOfEntries();
+    _maxNumberOfMultiValues = _columnMetadata.getMaxNumberOfMultiValues();
+    _dictionaryEnabled = 
!_indexLoadingConfig.getNoDictionaryColumns().contains(columnName);
+    _chunkCompressionType = getColumnCompressionType();
+    int numValues = _singleValue ? _numDocs : _totalNumberOfEntries;
+    _useMMapBuffer = numValues > NUM_VALUES_THRESHOLD_FOR_MMAP_BUFFER;
+
+    // Sorted columns should never need recreation of the forward index as the 
forwardIndexDisabled flag is treated as
+    // a no-op for sorted columns
+    File indexDir = segmentMetadata.getIndexDir();
+    String fileExtension;
+    if (_dictionaryEnabled) {
+      fileExtension = _singleValue ? 
V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION
+          : V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION;
+    } else {
+      fileExtension = _singleValue ? 
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION
+          : V1Constants.Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION;
+    }
+    _forwardIndexFile = new File(indexDir, columnName + fileExtension);
+    _forwardIndexValueBufferFile = new File(indexDir, columnName + 
FORWARD_INDEX_VALUE_BUFFER_SUFFIX);
+    _forwardIndexLengthBufferFile = new File(indexDir, columnName + 
FORWARD_INDEX_LENGTH_BUFFER_SUFFIX);
+    _forwardIndexMaxSizeBufferFile = new File(indexDir, columnName + 
FORWARD_INDEX_MAX_SIZE_BUFFER_SUFFIX);
+
+    // Create the temporary buffers needed
+    try {
+      _forwardIndexValueBuffer = createTempBuffer((long) numValues * 
Integer.BYTES, _forwardIndexValueBufferFile);
+      if (!_singleValue) {
+        _forwardIndexLengthBuffer = createTempBuffer((long) _numDocs * 
Integer.BYTES, _forwardIndexLengthBufferFile);
+        for (int i = 0; i < _numDocs; i++) {
+          // We need to clear the forward index length buffer because we rely 
on the initial value of 0, and keep
+          // updating the value instead of directly setting the value
+          _forwardIndexLengthBuffer.putInt((long) i * Integer.BYTES, 0);
+        }
+        _forwardIndexMaxSizeBuffer = createTempBuffer((long) _numDocs * 
Integer.BYTES, _forwardIndexMaxSizeBufferFile);
+        for (int i = 0; i < _numDocs; i++) {
+          // We need to clear the forward index max size buffer because we 
rely on the initial value of 0, and keep
+          // updating the value instead of directly setting the value
+          _forwardIndexMaxSizeBuffer.putInt((long) i * Integer.BYTES, 0);
+        }
+      }
+    } catch (Exception e) {
+      destroyBuffer(_forwardIndexValueBuffer, _forwardIndexValueBufferFile);
+      destroyBuffer(_forwardIndexLengthBuffer, _forwardIndexLengthBufferFile);
+      destroyBuffer(_forwardIndexMaxSizeBuffer, 
_forwardIndexMaxSizeBufferFile);
+      throw new IOException("Couldn't create temp buffers to construct forward 
index", e);
+    }
+  }
+
+  private ChunkCompressionType getColumnCompressionType() {
+    if (!_indexLoadingConfig.getNoDictionaryColumns().contains(_columnName)) {
+      return null;
+    }
+
+    ChunkCompressionType compressionType = null;
+    if (_indexLoadingConfig.getCompressionConfigs().containsKey(_columnName)) {
+      compressionType = 
_indexLoadingConfig.getCompressionConfigs().get(_columnName);
+    } else if 
(_indexLoadingConfig.getNoDictionaryConfig().containsKey(_columnName)) {
+      compressionType = 
ChunkCompressionType.valueOf(_indexLoadingConfig.getNoDictionaryConfig().get(_columnName));
+    }
+
+    // This logic to choose the default compressionType is duplicated from
+    // SegmentColumnarIndexCreator::getColumnCompressionType
+    if (compressionType == null) {
+      if (_columnMetadata.getFieldSpec().getFieldType() == 
FieldSpec.FieldType.METRIC) {
+        compressionType = ChunkCompressionType.PASS_THROUGH;
+      } else {
+        compressionType = ChunkCompressionType.LZ4;
+      }
+    }
+    return compressionType;
+  }
+
+  public void constructForwardIndexFromInvertedIndexAndDictionary()
+      throws IOException {
+    File indexDir = _segmentMetadata.getIndexDir();
+    String segmentName = _segmentMetadata.getName();
+    File inProgress = new File(indexDir, _columnName + ".fwd.inprogress");
+
+    if (!inProgress.exists()) {
+      // Marker file does not exist, which means last run ended normally.
+      // Create a marker file.
+      FileUtils.touch(inProgress);
+    } else {
+      // Marker file exists, which means last run was interrupted.
+      // Remove forward index if exists.
+      FileUtils.deleteQuietly(_forwardIndexFile);
+    }
+
+    // Create new forward index for the column.
+    LOGGER.info("Creating a new forward index for segment: {}, column: {}, 
isTemporary: {}", segmentName, _columnName,
+        _isTemporaryForwardIndex);
+
+    Map<String, String> metadataProperties;
+    if (_singleValue) {
+      metadataProperties = createForwardIndexForSVColumn();
+    } else {
+      metadataProperties = createForwardIndexForMVColumn();
+    }
+
+    LoaderUtils.writeIndexToV3Format(_segmentWriter, _columnName, 
_forwardIndexFile, ColumnIndexType.FORWARD_INDEX);
+
+    if (!_isTemporaryForwardIndex) {
+      // Only update the metadata and cleanup other indexes if the forward 
index to be created is permanent. If the
+      // forward index is temporary, it is meant to be used only for 
construction of other indexes and will be deleted
+      // once all the IndexHandlers have completed.
+      try {
+        LOGGER.info("Created forward index from inverted index and dictionary. 
Updating metadata properties for "
+            + "segment: {}, column: {}, property list: {}", segmentName, 
_columnName, metadataProperties);
+        
ForwardIndexHandler.updateMetadataProperties(_segmentMetadata.getIndexDir(), 
metadataProperties);
+      } catch (Exception e) {
+        throw new IOException(
+            String.format("Failed to update metadata properties for segment: 
%s, column: %s", segmentName, _columnName),
+            e);
+      }
+
+      if (!_dictionaryEnabled) {
+        LOGGER.info("Clean up indexes no longer needed or which need to be 
rewritten for segment: {}, column: {}",
+            segmentName, _columnName);
+        // Delete the dictionary
+        _segmentWriter.removeIndex(_columnName, ColumnIndexType.DICTIONARY);
+
+        // We remove indexes that have to be rewritten when a dictEnabled is 
toggled. Note that the respective index
+        // handler will take care of recreating the index.
+        ForwardIndexHandler.removeDictRelatedIndexes(_columnName, 
_segmentWriter);
+      }
+    }
+
+    // Delete the marker file.
+    FileUtils.deleteQuietly(inProgress);
+
+    LOGGER.info("Created a new forward index for segment: {}, column: {}, 
isTemporary: {}", segmentName, _columnName,
+        _isTemporaryForwardIndex);
+  }
+
+  private Map<String, String> createForwardIndexForSVColumn()
+      throws IOException {
+    try (BitmapInvertedIndexReader invertedIndexReader =
+        (BitmapInvertedIndexReader) 
LoaderUtils.getInvertedIndexReader(_segmentWriter, _columnMetadata);
+        Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, 
_columnMetadata)) {
+      boolean shouldTrackVarLengthMetadata = 
!_columnMetadata.getFieldSpec().getDataType().isFixedWidth();
+      int[] lengthOfLongestEntry = shouldTrackVarLengthMetadata ? new int[]{0} 
: new int[]{-1};
+      // Construct the forward index in the values buffer
+      for (int dictId = 0; dictId < _cardinality; dictId++) {
+        ImmutableRoaringBitmap docIdsBitmap = 
invertedIndexReader.getDocIds(dictId);
+        int finalDictId = dictId;

Review Comment:
   Curious why do we need this temp assignment in `finalDictId` ? May be I am 
missing something



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/InvertedIndexAndDictionaryBasedForwardIndexCreator.java:
##########
@@ -0,0 +1,619 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.loader;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import 
org.apache.pinot.segment.local.segment.index.readers.BitmapInvertedIndexReader;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.segment.spi.store.ColumnIndexType;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.DICTIONARY_ELEMENT_SIZE;
+import static 
org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.HAS_DICTIONARY;
+import static 
org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.MAX_MULTI_VALUE_ELEMENTS;
+import static 
org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.TOTAL_NUMBER_OF_ENTRIES;
+import static 
org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.getKeyFor;
+
+
+/**
+ * Helper classed used by the {@link SegmentPreProcessor} to generate the 
forward index from inverted index and
+ * dictionary when the forward index is enabled for columns where it was 
previously disabled. This is also invoked by
+ * the {@link IndexHandler} code in scenarios where the forward index needs to 
be temporarily created to generate other
+ * indexes for the given column. In such cases the forward index will be 
cleaned up after the {@link IndexHandler} code
+ * completes.
+ *
+ * For multi-value columns the following invariants cannot be maintained:
+ * - Ordering of elements within a given multi-value row. This will always be 
a limitation.
+ * - Data loss is possible if there repeats for elements within a given 
multi-value row. This limitation will be
+ *   addressed as a future change
+ *
+ * TODO: Currently for multi-value columns generating the forward index can 
lead to a data loss as frequency information
+ *       is not available for repeats within a given row. This needs to be 
addressed by tracking the frequency data
+ *       as part of an on-disk structure when forward index is disabled for a 
column.
+ */
+public class InvertedIndexAndDictionaryBasedForwardIndexCreator implements 
AutoCloseable {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InvertedIndexAndDictionaryBasedForwardIndexCreator.class);
+
+  // Use MMapBuffer if the value buffer size is larger than 2G
+  private static final int NUM_VALUES_THRESHOLD_FOR_MMAP_BUFFER = 500_000_000;
+
+  private static final String FORWARD_INDEX_VALUE_BUFFER_SUFFIX = 
".fwd.idx.val.buf";
+  private static final String FORWARD_INDEX_LENGTH_BUFFER_SUFFIX = 
".fwd.idx.len.buf";
+  private static final String FORWARD_INDEX_MAX_SIZE_BUFFER_SUFFIX = 
".fwd.idx.maxsize.buf";
+
+  private final String _columnName;
+  private final SegmentMetadata _segmentMetadata;
+  private final IndexLoadingConfig _indexLoadingConfig;
+  private final SegmentDirectory.Writer _segmentWriter;
+  private final IndexCreatorProvider _indexCreatorProvider;
+  private final boolean _isTemporaryForwardIndex;
+
+  // Metadata
+  private final ColumnMetadata _columnMetadata;
+  private final boolean _singleValue;
+  private final int _cardinality;
+  private final int _numDocs;
+  private final int _maxNumberOfMultiValues;
+  private final int _totalNumberOfEntries;
+  private final boolean _dictionaryEnabled;
+  private final ChunkCompressionType _chunkCompressionType;
+  private final boolean _useMMapBuffer;
+
+  // Files and temporary buffers
+  private final File _forwardIndexFile;
+  private final File _forwardIndexValueBufferFile;
+  private final File _forwardIndexLengthBufferFile;
+  private final File _forwardIndexMaxSizeBufferFile;
+
+  // Forward index buffers (to store the dictId at the correct docId)
+  private PinotDataBuffer _forwardIndexValueBuffer;
+  // For multi-valued column only because each docId can have multiple dictIds
+  private PinotDataBuffer _forwardIndexLengthBuffer;
+  private int _nextValueId;
+  // For multi-valued column only to track max row size
+  private PinotDataBuffer _forwardIndexMaxSizeBuffer;
+
+  public InvertedIndexAndDictionaryBasedForwardIndexCreator(String columnName, 
SegmentMetadata segmentMetadata,
+      IndexLoadingConfig indexLoadingConfig, SegmentDirectory.Writer 
segmentWriter,
+      IndexCreatorProvider indexCreatorProvider, boolean 
isTemporaryForwardIndex)
+      throws IOException {
+    _columnName = columnName;
+    _segmentMetadata = segmentMetadata;
+    _indexLoadingConfig = indexLoadingConfig;
+    _segmentWriter = segmentWriter;
+    _indexCreatorProvider = indexCreatorProvider;
+    _isTemporaryForwardIndex = isTemporaryForwardIndex;
+
+    _columnMetadata = segmentMetadata.getColumnMetadataFor(columnName);
+    _singleValue = _columnMetadata.isSingleValue();
+    _cardinality = _columnMetadata.getCardinality();
+    _numDocs = _columnMetadata.getTotalDocs();
+    _totalNumberOfEntries = _columnMetadata.getTotalNumberOfEntries();
+    _maxNumberOfMultiValues = _columnMetadata.getMaxNumberOfMultiValues();
+    _dictionaryEnabled = 
!_indexLoadingConfig.getNoDictionaryColumns().contains(columnName);
+    _chunkCompressionType = getColumnCompressionType();
+    int numValues = _singleValue ? _numDocs : _totalNumberOfEntries;
+    _useMMapBuffer = numValues > NUM_VALUES_THRESHOLD_FOR_MMAP_BUFFER;
+
+    // Sorted columns should never need recreation of the forward index as the 
forwardIndexDisabled flag is treated as
+    // a no-op for sorted columns
+    File indexDir = segmentMetadata.getIndexDir();
+    String fileExtension;
+    if (_dictionaryEnabled) {
+      fileExtension = _singleValue ? 
V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION
+          : V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION;
+    } else {
+      fileExtension = _singleValue ? 
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION
+          : V1Constants.Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION;
+    }
+    _forwardIndexFile = new File(indexDir, columnName + fileExtension);
+    _forwardIndexValueBufferFile = new File(indexDir, columnName + 
FORWARD_INDEX_VALUE_BUFFER_SUFFIX);
+    _forwardIndexLengthBufferFile = new File(indexDir, columnName + 
FORWARD_INDEX_LENGTH_BUFFER_SUFFIX);
+    _forwardIndexMaxSizeBufferFile = new File(indexDir, columnName + 
FORWARD_INDEX_MAX_SIZE_BUFFER_SUFFIX);
+
+    // Create the temporary buffers needed
+    try {
+      _forwardIndexValueBuffer = createTempBuffer((long) numValues * 
Integer.BYTES, _forwardIndexValueBufferFile);
+      if (!_singleValue) {
+        _forwardIndexLengthBuffer = createTempBuffer((long) _numDocs * 
Integer.BYTES, _forwardIndexLengthBufferFile);
+        for (int i = 0; i < _numDocs; i++) {
+          // We need to clear the forward index length buffer because we rely 
on the initial value of 0, and keep
+          // updating the value instead of directly setting the value
+          _forwardIndexLengthBuffer.putInt((long) i * Integer.BYTES, 0);
+        }
+        _forwardIndexMaxSizeBuffer = createTempBuffer((long) _numDocs * 
Integer.BYTES, _forwardIndexMaxSizeBufferFile);
+        for (int i = 0; i < _numDocs; i++) {
+          // We need to clear the forward index max size buffer because we 
rely on the initial value of 0, and keep
+          // updating the value instead of directly setting the value
+          _forwardIndexMaxSizeBuffer.putInt((long) i * Integer.BYTES, 0);
+        }
+      }
+    } catch (Exception e) {
+      destroyBuffer(_forwardIndexValueBuffer, _forwardIndexValueBufferFile);
+      destroyBuffer(_forwardIndexLengthBuffer, _forwardIndexLengthBufferFile);
+      destroyBuffer(_forwardIndexMaxSizeBuffer, 
_forwardIndexMaxSizeBufferFile);
+      throw new IOException("Couldn't create temp buffers to construct forward 
index", e);
+    }
+  }
+
+  private ChunkCompressionType getColumnCompressionType() {
+    if (!_indexLoadingConfig.getNoDictionaryColumns().contains(_columnName)) {
+      return null;
+    }
+
+    ChunkCompressionType compressionType = null;
+    if (_indexLoadingConfig.getCompressionConfigs().containsKey(_columnName)) {
+      compressionType = 
_indexLoadingConfig.getCompressionConfigs().get(_columnName);
+    } else if 
(_indexLoadingConfig.getNoDictionaryConfig().containsKey(_columnName)) {
+      compressionType = 
ChunkCompressionType.valueOf(_indexLoadingConfig.getNoDictionaryConfig().get(_columnName));
+    }
+
+    // This logic to choose the default compressionType is duplicated from
+    // SegmentColumnarIndexCreator::getColumnCompressionType
+    if (compressionType == null) {
+      if (_columnMetadata.getFieldSpec().getFieldType() == 
FieldSpec.FieldType.METRIC) {
+        compressionType = ChunkCompressionType.PASS_THROUGH;
+      } else {
+        compressionType = ChunkCompressionType.LZ4;
+      }
+    }
+    return compressionType;
+  }
+
+  public void constructForwardIndexFromInvertedIndexAndDictionary()
+      throws IOException {
+    File indexDir = _segmentMetadata.getIndexDir();
+    String segmentName = _segmentMetadata.getName();
+    File inProgress = new File(indexDir, _columnName + ".fwd.inprogress");
+
+    if (!inProgress.exists()) {
+      // Marker file does not exist, which means last run ended normally.
+      // Create a marker file.
+      FileUtils.touch(inProgress);
+    } else {
+      // Marker file exists, which means last run was interrupted.
+      // Remove forward index if exists.
+      FileUtils.deleteQuietly(_forwardIndexFile);
+    }
+
+    // Create new forward index for the column.
+    LOGGER.info("Creating a new forward index for segment: {}, column: {}, 
isTemporary: {}", segmentName, _columnName,
+        _isTemporaryForwardIndex);
+
+    Map<String, String> metadataProperties;
+    if (_singleValue) {
+      metadataProperties = createForwardIndexForSVColumn();
+    } else {
+      metadataProperties = createForwardIndexForMVColumn();
+    }
+
+    LoaderUtils.writeIndexToV3Format(_segmentWriter, _columnName, 
_forwardIndexFile, ColumnIndexType.FORWARD_INDEX);
+
+    if (!_isTemporaryForwardIndex) {
+      // Only update the metadata and cleanup other indexes if the forward 
index to be created is permanent. If the
+      // forward index is temporary, it is meant to be used only for 
construction of other indexes and will be deleted
+      // once all the IndexHandlers have completed.
+      try {
+        LOGGER.info("Created forward index from inverted index and dictionary. 
Updating metadata properties for "
+            + "segment: {}, column: {}, property list: {}", segmentName, 
_columnName, metadataProperties);
+        
ForwardIndexHandler.updateMetadataProperties(_segmentMetadata.getIndexDir(), 
metadataProperties);
+      } catch (Exception e) {
+        throw new IOException(
+            String.format("Failed to update metadata properties for segment: 
%s, column: %s", segmentName, _columnName),
+            e);
+      }
+
+      if (!_dictionaryEnabled) {
+        LOGGER.info("Clean up indexes no longer needed or which need to be 
rewritten for segment: {}, column: {}",
+            segmentName, _columnName);
+        // Delete the dictionary
+        _segmentWriter.removeIndex(_columnName, ColumnIndexType.DICTIONARY);
+
+        // We remove indexes that have to be rewritten when a dictEnabled is 
toggled. Note that the respective index
+        // handler will take care of recreating the index.
+        ForwardIndexHandler.removeDictRelatedIndexes(_columnName, 
_segmentWriter);
+      }
+    }
+
+    // Delete the marker file.
+    FileUtils.deleteQuietly(inProgress);
+
+    LOGGER.info("Created a new forward index for segment: {}, column: {}, 
isTemporary: {}", segmentName, _columnName,
+        _isTemporaryForwardIndex);
+  }
+
+  private Map<String, String> createForwardIndexForSVColumn()
+      throws IOException {
+    try (BitmapInvertedIndexReader invertedIndexReader =
+        (BitmapInvertedIndexReader) 
LoaderUtils.getInvertedIndexReader(_segmentWriter, _columnMetadata);
+        Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, 
_columnMetadata)) {
+      boolean shouldTrackVarLengthMetadata = 
!_columnMetadata.getFieldSpec().getDataType().isFixedWidth();
+      int[] lengthOfLongestEntry = shouldTrackVarLengthMetadata ? new int[]{0} 
: new int[]{-1};
+      // Construct the forward index in the values buffer
+      for (int dictId = 0; dictId < _cardinality; dictId++) {
+        ImmutableRoaringBitmap docIdsBitmap = 
invertedIndexReader.getDocIds(dictId);
+        int finalDictId = dictId;

Review Comment:
   Curious why we need this temp assignment in `finalDictId` ? May be I am 
missing something



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to