This is an automated email from the ASF dual-hosted git repository.

lqc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 585e33338e Add immtuable CLPForwardIndex creator and related classes 
(#14288)
585e33338e is described below

commit 585e33338ec1e6030916717c101ab23a843bf019
Author: Jack Luo <jlu...@ext.uber.com>
AuthorDate: Mon Dec 2 16:42:46 2024 +0800

    Add immtuable CLPForwardIndex creator and related classes (#14288)
    
    * Add initial implementation of CLPForwardIndexCreatorV2 and the associated 
reader, unit test, table config updates, etc.
    
    * Add apache license
    
    * Modify table config to recognize CLPV2 compression codec.
    
    * Add columnar ingestion code for CLP + some refactoring + bug fix
    
    * Remove efficient columnar ingestion.
    
    * Refactored out dead code.
    
    * Refactored javadoc
    
    * Refactored code.
---
 .../stats/MutableNoDictionaryColStatistics.java    |   9 +
 .../impl/forward/CLPMutableForwardIndexV2.java     |  37 +-
 .../creator/impl/fwd/CLPForwardIndexCreatorV2.java | 488 +++++++++++++++++++++
 .../creator/impl/stats/CLPStatsProvider.java       |  25 ++
 .../index/forward/ForwardIndexCreatorFactory.java  |   4 +
 .../index/forward/ForwardIndexReaderFactory.java   |   9 +
 .../segment/index/forward/ForwardIndexType.java    |   6 +-
 .../readers/forward/CLPForwardIndexReaderV2.java   | 260 +++++++++++
 .../segment/local/utils/TableConfigUtils.java      |   6 +-
 .../creator/CLPForwardIndexCreatorV2Test.java      |  92 ++++
 .../segment/spi/index/ForwardIndexConfig.java      |   1 +
 .../apache/pinot/spi/config/table/FieldConfig.java |   5 +-
 12 files changed, 925 insertions(+), 17 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java
index 476bb5e10b..b8d77bd254 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java
@@ -125,4 +125,13 @@ public class MutableNoDictionaryColStatistics implements 
ColumnStatistics, CLPSt
     throw new IllegalStateException(
         "CLP stats not available for column: " + 
_dataSourceMetadata.getFieldSpec().getName());
   }
+
+  @Override
+  public CLPV2Stats getCLPV2Stats() {
+    if (_forwardIndex instanceof CLPMutableForwardIndexV2) {
+      return ((CLPMutableForwardIndexV2) _forwardIndex).getCLPV2Stats();
+    }
+    throw new IllegalStateException(
+        "CLPV2 stats not available for column: " + 
_dataSourceMetadata.getFieldSpec().getName());
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/CLPMutableForwardIndexV2.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/CLPMutableForwardIndexV2.java
index 8cd940d2df..326f77ed4a 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/CLPMutableForwardIndexV2.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/CLPMutableForwardIndexV2.java
@@ -105,8 +105,8 @@ import org.slf4j.LoggerFactory;
 public class CLPMutableForwardIndexV2 implements MutableForwardIndex {
   protected static final Logger LOGGER = 
LoggerFactory.getLogger(CLPMutableForwardIndexV2.class);
   public final String _columnName;
-
   protected final EncodedMessage _clpEncodedMessage;
+  protected final EncodedMessage _failToEncodeClpEncodedMessage;
   protected final MessageEncoder _clpMessageEncoder;
   protected final MessageDecoder _clpMessageDecoder;
 
@@ -135,7 +135,7 @@ public class CLPMutableForwardIndexV2 implements 
MutableForwardIndex {
 
   // Various forward index and dictionary configurations with default values
   // TODO: Provide more optimized default values in the future
-  protected int _estimatedMaxDocCount = 4096;
+  protected int _estimatedMaxDocCount = 65536;
   protected int _rawMessageEstimatedAvgEncodedLength = 256;
   protected int _estimatedLogtypeAvgEncodedLength = 256;
   protected int _logtypeIdNumRowsPerChunk = _estimatedMaxDocCount;
@@ -150,7 +150,7 @@ public class CLPMutableForwardIndexV2 implements 
MutableForwardIndex {
   protected int _encodedVarPerChunk = 256 * 1024;
 
   // Dynamic CLP dictionary encoding configs
-  protected int _minNumDocsBeforeCardinalityMonitoring = _estimatedMaxDocCount 
/ 16;
+  protected int _minNumDocsBeforeCardinalityMonitoring = _estimatedMaxDocCount 
/ 8;
   protected boolean _forceEnableClpEncoding = false;
   protected int _inverseLogtypeCardinalityRatioStopThreshold = 10;
   protected int _inverseDictVarCardinalityRatioStopThreshold = 10;
@@ -165,6 +165,14 @@ public class CLPMutableForwardIndexV2 implements 
MutableForwardIndex {
     _clpMessageDecoder = new 
MessageDecoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
         BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
 
+    _failToEncodeClpEncodedMessage = new EncodedMessage();
+    try {
+      _clpMessageEncoder.encodeMessage("Failed to encode message", 
_failToEncodeClpEncodedMessage);
+    } catch (IOException ex) {
+      // Should not happen
+      throw new IllegalArgumentException("Failed to encode error message", ex);
+    }
+
     // Raw forward index stored as bytes
     _rawBytes = new VarByteSVMutableForwardIndex(FieldSpec.DataType.BYTES, 
memoryManager, columnName + "_rawBytes.fwd",
         _estimatedMaxDocCount, _rawMessageEstimatedAvgEncodedLength);
@@ -210,11 +218,14 @@ public class CLPMutableForwardIndexV2 implements 
MutableForwardIndex {
   @Override
   public void setString(int docId, String value) {
     // docId is intentionally ignored because this forward index only supports 
sequential writes (append only)
+    EncodedMessage encodedMessage = _clpEncodedMessage;
     try {
-      _clpMessageEncoder.encodeMessage(value, _clpEncodedMessage);
-      appendEncodedMessage(_clpEncodedMessage);
+      _clpMessageEncoder.encodeMessage(value, encodedMessage);
     } catch (IOException e) {
-      throw new IllegalArgumentException("Failed to encode message: " + value, 
e);
+      // Encode a fail-to-encode message if CLP encoding fails
+      encodedMessage = _failToEncodeClpEncodedMessage;
+    } finally {
+      appendEncodedMessage(encodedMessage);
     }
   }
 
@@ -226,11 +237,9 @@ public class CLPMutableForwardIndexV2 implements 
MutableForwardIndex {
    * encoded message by replacing them with empty arrays, as Pinot does not 
accept null values.
    *
    * @param clpEncodedMessage The {@link EncodedMessage} to append.
-   * @throws IOException if an I/O error occurs during the appending process.
    */
-  public void appendEncodedMessage(@NotNull EncodedMessage clpEncodedMessage)
-      throws IOException {
-    if (_isClpEncoded) {
+  public void appendEncodedMessage(@NotNull EncodedMessage clpEncodedMessage) {
+    if (_isClpEncoded || _forceEnableClpEncoding) {
       _logtypeId.setInt(_nextDocId, 
_logtypeDict.index(clpEncodedMessage.getLogtype()));
 
       FlattenedByteArray flattenedDictVars = 
clpEncodedMessage.getDictionaryVarsAsFlattenedByteArray();
@@ -257,13 +266,13 @@ public class CLPMutableForwardIndexV2 implements 
MutableForwardIndex {
       _encodedVarOffset.setInt(_nextDocId, _nextEncodedVarId);
 
       // Turn off clp encoding when dictionary size is exceeded
-      if (_nextDocId > _minNumDocsBeforeCardinalityMonitoring) {
+      if (_nextDocId > _minNumDocsBeforeCardinalityMonitoring && 
!_forceEnableClpEncoding) {
         int inverseLogtypeCardinalityRatio = _nextDocId / 
_logtypeDict.length();
         if (inverseLogtypeCardinalityRatio < 
_inverseLogtypeCardinalityRatioStopThreshold) {
           _isClpEncoded = false;
           _bytesRawFwdIndexDocIdStartOffset = _nextDocId + 1;
         } else if (_dictVarDict.length() > 0) {
-          int inverseDictVarCardinalityRatio = _nextDictVarDocId / 
_dictVarDict.length();
+          int inverseDictVarCardinalityRatio = Math.max(_nextDocId, 
_nextDictVarDocId) / _dictVarDict.length();
           if (inverseDictVarCardinalityRatio < 
_inverseDictVarCardinalityRatioStopThreshold) {
             _isClpEncoded = false;
             _bytesRawFwdIndexDocIdStartOffset = _nextDocId + 1;
@@ -444,6 +453,10 @@ public class CLPMutableForwardIndexV2 implements 
MutableForwardIndex {
         totalNumberOfEncodedVars, maxNumberOfEncodedVars);
   }
 
+  public CLPStatsProvider.CLPV2Stats getCLPV2Stats() {
+    return new CLPStatsProvider.CLPV2Stats(this);
+  }
+
   public String[] 
getSortedDictionaryValuesAsStrings(BytesOffHeapMutableDictionary dict, Charset 
charset) {
     // Adapted from StringOffHeapMutableDictionary#getSortedValues()
     int numValues = dict.length();
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV2.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV2.java
new file mode 100644
index 0000000000..2a762d481d
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV2.java
@@ -0,0 +1,488 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.fwd;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.StandardOpenOption;
+import javax.validation.constraints.NotNull;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.pinot.segment.local.io.util.VarLengthValueWriter;
+import 
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkForwardIndexWriter;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV5;
+import 
org.apache.pinot.segment.local.realtime.impl.dictionary.BytesOffHeapMutableDictionary;
+import 
org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndexV2;
+import 
org.apache.pinot.segment.local.segment.creator.impl.stats.CLPStatsProvider;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.ColumnStatistics;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@code CLPForwardIndexCreatorV2} is responsible for creating the final 
immutable forward index
+ * from the {@link CLPMutableForwardIndexV2}. This forward index can be either 
dictionary-encoded using CLP
+ * or raw-bytes-encoded, depending on the configuration and the 
characteristics of the data being processed.
+ *
+ * <p>Compared to the previous version, {@link CLPForwardIndexCreatorV1}, this 
V2 implementation introduces several
+ * key improvements:</p>
+ *
+ * <ol>
+ *   <li><strong>Improved Compression Ratio:</strong>
+ *   <p>Instead of using fixed-bit encoding (uncompressed), this version uses 
fixed-byte encoding with Zstandard
+ *   chunk compression for dictionary-encoded IDs. In real-world log data, 
particularly for dictionary-encoded
+ *   columns, the number of dictionary entries is often too large enough for 
fixed-bit encoding to achieve optimal
+ *   compression ratio. Using fixed-byte encoding with Zstandard compression 
significantly improves compression
+ *   ratio.</p>
+ *   </li>
+ *
+ *   <li><strong>Upgrade to V5 Writer Version:</strong>
+ *   <p>This version uses the V5 writer for the forward index, which was 
introduced to improve the compression ratio
+ *   for multi-value fixed-width data types (e.g., longs, ints). The 
compression efficiency of
+ *   {@code CLPForwardIndexCreatorV2} heavily relies on the optimal storage of 
multi-valued columns for dictionary
+ *   variable IDs and encoded variables.</p>
+ *   </li>
+ *
+ *   <li><strong>Reduced Serialization/Deserialization Overhead:</strong>
+ *   <p>The conversion from mutable to immutable forward indexes is 
significantly optimized. In
+ *   {@link CLPForwardIndexCreatorV1}, the conversion had to decode each row 
using CLP from the mutable forward index
+ *   and re-encode it, introducing non-trivial serialization and 
deserialization (serdes) overhead. The new
+ *   {@link CLPMutableForwardIndexV2} eliminates this process entirely when 
Pinot is configured for columnar segment
+ *   conversion (default config), avoiding the need for redundant decoding and 
re-encoding (implemented in a followup
+ *   PR). Row-based segment conversion serdes overhead can be reduced in a 
similar way, but was not implemented due to
+ *   lack of need. Additionally, primitive types (byte[]) are used for forward 
indexes to avoid boxing strings into
+ *   {@link String} objects, which improves both performance and memory 
efficiency (by reducing garbage collection
+ *   overhead on the heap).</p>
+ *   </li>
+ * </ol>
+ *
+ * <h3>Intermediate Files:</h3>
+ * <p>
+ * The class manages intermediate files during the forward index creation 
process. These files are cleaned up once
+ * the index is sealed and written to the final segment file.
+ * </p>
+ *
+ * @see CLPMutableForwardIndexV2
+ * @see VarByteChunkForwardIndexWriterV5
+ * @see ForwardIndexCreator
+ */
+public class CLPForwardIndexCreatorV2 implements ForwardIndexCreator {
+  public static final Logger LOGGER = 
LoggerFactory.getLogger(CLPForwardIndexCreatorV2.class);
+  public static final byte[] MAGIC_BYTES = 
"CLP.v2".getBytes(StandardCharsets.UTF_8);
+
+  public final String _column;
+  private final int _numDoc;
+
+  private final File _intermediateFilesDir;
+  private final FileChannel _dataFile;
+  private final ByteBuffer _fileBuffer;
+
+  private final boolean _isClpEncoded;
+  private int _logtypeDictSize;
+  private File _logtypeDictFile;
+  private VarLengthValueWriter _logtypeDict;
+  private int _dictVarDictSize;
+  private File _dictVarDictFile;
+  private VarLengthValueWriter _dictVarDict;
+  private File _logtypeIdFwdIndexFile;
+  private FixedByteChunkForwardIndexWriter _logtypeIdFwdIndex;
+  private File _dictVarIdFwdIndexFile;
+  private VarByteChunkForwardIndexWriterV5 _dictVarIdFwdIndex;
+  private File _encodedVarFwdIndexFile;
+  private VarByteChunkForwardIndexWriterV5 _encodedVarFwdIndex;
+  private File _rawMsgFwdIndexFile;
+  private VarByteChunkForwardIndexWriterV5 _rawMsgFwdIndex;
+  private int _targetChunkSize = 1 << 20;   // 1MB in bytes
+
+  private final EncodedMessage _clpEncodedMessage;
+  private final EncodedMessage _failToEncodeClpEncodedMessage;
+  private final MessageEncoder _clpMessageEncoder;
+
+  private final BytesOffHeapMutableDictionary _mutableLogtypeDict;
+  private final BytesOffHeapMutableDictionary _mutableDictVarDict;
+  private final ChunkCompressionType _chunkCompressionType;
+
+  /**
+   * Initializes a forward index creator for the given column using the 
provided base directory and column statistics.
+   * This constructor is specifically used by {@code 
ForwardIndexCreatorFactory}. Unlike other immutable forward index
+   * constructors, this one handles the entire process of converting a mutable 
forward index into an immutable one.
+   *
+   * <p>The {@code columnStatistics} object passed into this constructor 
should contain a reference to the mutable
+   * forward index ({@link CLPMutableForwardIndexV2}). The data from the 
mutable index is efficiently copied over
+   * into this forward index, which helps minimize serdes overhead. Because of 
this design, the usual
+   * {@code putString(String value)} method used during the normal conversion 
process, is effectively a no-op in
+   * this class.</p>
+   *
+   * @param baseIndexDir The base directory where the forward index files will 
be stored.
+   * @param columnStatistics The column statistics containing the CLP forward 
index information, including a reference
+   *        to the mutable forward index.
+   * @throws IOException If there is an error during initialization or while 
accessing the file system.
+   */
+  public CLPForwardIndexCreatorV2(File baseIndexDir, ColumnStatistics 
columnStatistics)
+      throws IOException {
+    this(baseIndexDir, ((CLPStatsProvider) 
columnStatistics).getCLPV2Stats().getClpMutableForwardIndexV2(),
+        ChunkCompressionType.ZSTANDARD);
+  }
+
+  /**
+   * Initializes a forward index creator for the given column using the 
provided mutable forward index and
+   * compression type. This constructor sets up the forward index for batch 
ingestion based on the provided CLP
+   * mutable forward index.
+   *
+   * @param baseIndexDir The base directory where the forward index files will 
be stored.
+   * @param clpMutableForwardIndex The mutable forward index containing the 
raw data to be ingested.
+   * @param chunkCompressionType The compression type to be used for encoding 
the forward index.
+   * @throws IOException If there is an error during initialization or while 
accessing the file system.
+   */
+  public CLPForwardIndexCreatorV2(File baseIndexDir, CLPMutableForwardIndexV2 
clpMutableForwardIndex,
+      ChunkCompressionType chunkCompressionType)
+      throws IOException {
+    this(baseIndexDir, clpMutableForwardIndex, chunkCompressionType, false);
+  }
+
+  /**
+   * Initializes a forward index creator for the given column using the 
provided mutable forward index, compression
+   * type, and an option to force raw encoding. If `forceRawEncoding` is true, 
the forward index will store raw bytes
+   * instead of using CLP encoding.
+   *
+   * Note that although we already have access to all of the data in the 
mutable forward index in the constructor,
+   * we will not be performing the conversion from mutable forward index to 
immutable forward index here. The reason
+   * is that the docID may be reordered during segment conversion phase for 
sorted-tables. For row-based ingestion,
+   * the data is ingested via {@code putString(String value)} method which is 
a code path with we did not optimize.
+   * For optimal mutable to immutable forward index conversion performance, 
use columnar ingestion (the default config)
+   * in Pinot now which avoids the serdes overhead.
+   *
+   * @param baseIndexDir The base directory where the forward index files will 
be stored.
+   * @param clpMutableForwardIndex The mutable forward index containing the 
raw data to be ingested.
+   * @param chunkCompressionType The compression type used for encoding the 
forward index.
+   * @param forceRawEncoding If true, raw bytes encoding will be used, 
bypassing CLP encoding.
+   * @throws IOException If there is an error during initialization or while 
accessing the file system.
+   */
+  public CLPForwardIndexCreatorV2(File baseIndexDir, CLPMutableForwardIndexV2 
clpMutableForwardIndex,
+      ChunkCompressionType chunkCompressionType, boolean forceRawEncoding)
+      throws IOException {
+    _chunkCompressionType = chunkCompressionType;
+
+    // Pick up metadata from mutable forward index and use it to initialize 
the immutable forward index
+    _column = clpMutableForwardIndex.getColumnName();
+    _numDoc = clpMutableForwardIndex.getNumDoc();
+
+    _intermediateFilesDir =
+        new File(baseIndexDir, _column + 
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION + ".clp.tmp");
+    if (_intermediateFilesDir.exists()) {
+      FileUtils.cleanDirectory(_intermediateFilesDir);
+    } else {
+      FileUtils.forceMkdir(_intermediateFilesDir);
+    }
+
+    // Pick up metadata from mutable forward index and use it to initialize 
the immutable forward index
+    _isClpEncoded = !forceRawEncoding && clpMutableForwardIndex.isClpEncoded();
+    _mutableLogtypeDict = clpMutableForwardIndex.getLogtypeDict();
+    _mutableDictVarDict = clpMutableForwardIndex.getDictVarDict();
+    if (_isClpEncoded) {
+      initializeDictionaryEncodingMode(chunkCompressionType, 
clpMutableForwardIndex.getLogtypeDict().length(),
+          clpMutableForwardIndex.getDictVarDict().length());
+      putLogtypeDict(clpMutableForwardIndex.getLogtypeDict());
+      putDictVarDict(clpMutableForwardIndex.getDictVarDict());
+    } else {
+      // Raw encoding
+      initializeRawEncodingMode(chunkCompressionType);
+    }
+
+    _dataFile =
+        new RandomAccessFile(new File(baseIndexDir, _column + 
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION),
+            "rw").getChannel();
+    _fileBuffer = _dataFile.map(FileChannel.MapMode.READ_WRITE, 0, 
Integer.MAX_VALUE);
+
+    // CLP encoding objects required structure for row-based mutable to 
immutable forward index conversion
+    _clpEncodedMessage = new EncodedMessage();
+    _clpMessageEncoder = new 
MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+        BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+    _failToEncodeClpEncodedMessage = new EncodedMessage();
+    try {
+      _clpMessageEncoder.encodeMessage("Failed to encode message", 
_failToEncodeClpEncodedMessage);
+    } catch (IOException ex) {
+      // Should not happen
+      throw new IllegalArgumentException("Failed to encode error message", ex);
+    }
+  }
+
+  /**
+   * Returns whether the current forward index is CLP-encoded.
+   *
+   * @return True if the forward index is CLP-encoded, false otherwise.
+   */
+  public boolean isClpEncoded() {
+    return _isClpEncoded;
+  }
+
+  /**
+   * Initializes the necessary components for raw encoding mode, including 
setting up the forward index file for raw
+   * message bytes. This method is called when CLP encoding is not used.
+   *
+   * @param chunkCompressionType The compression type used for encoding the 
forward index.
+   * @throws IOException If there is an error during initialization or while 
accessing the file system.
+   */
+  private void initializeRawEncodingMode(ChunkCompressionType 
chunkCompressionType)
+      throws IOException {
+    _rawMsgFwdIndexFile = new File(_intermediateFilesDir, _column + ".rawMsg");
+    _rawMsgFwdIndex = new 
VarByteChunkForwardIndexWriterV5(_rawMsgFwdIndexFile, chunkCompressionType, 
_targetChunkSize);
+  }
+
+  /**
+   * Initializes the necessary components for dictionary encoding mode, 
including setting up the forward index files for
+   * logtype IDs, dictionary variable IDs, and encoded variables. This method 
is called when CLP encoding is used.
+   *
+   * @param chunkCompressionType The compression type used for encoding the 
forward index.
+   * @param logtypeDictSize The size of the logtype dictionary.
+   * @param dictVarDictSize The size of the variable-length dictionary.
+   * @throws IOException If there is an error during initialization or while 
accessing the file system.
+   */
+  private void initializeDictionaryEncodingMode(ChunkCompressionType 
chunkCompressionType, int logtypeDictSize,
+      int dictVarDictSize)
+      throws IOException {
+    _logtypeDictFile = new File(_intermediateFilesDir, _column + ".lt.dict");
+    _logtypeDict = new VarLengthValueWriter(_logtypeDictFile, logtypeDictSize);
+    _logtypeDictSize = logtypeDictSize;
+    _logtypeIdFwdIndexFile = new File(_intermediateFilesDir, _column + 
".lt.id");
+    _logtypeIdFwdIndex = new 
FixedByteChunkForwardIndexWriter(_logtypeIdFwdIndexFile, chunkCompressionType, 
_numDoc,
+        _targetChunkSize / FieldSpec.DataType.INT.size(), 
FieldSpec.DataType.INT.size(),
+        VarByteChunkForwardIndexWriterV5.VERSION);
+    _dictVarDictFile = new File(_intermediateFilesDir, _column + ".var.dict");
+    _dictVarDict = new VarLengthValueWriter(_dictVarDictFile, dictVarDictSize);
+    _dictVarDictSize = dictVarDictSize;
+    _dictVarIdFwdIndexFile = new File(_dictVarIdFwdIndexFile, _column + 
".dictVars");
+    _dictVarIdFwdIndex =
+        new VarByteChunkForwardIndexWriterV5(_dictVarIdFwdIndexFile, 
chunkCompressionType, _targetChunkSize);
+
+    _encodedVarFwdIndexFile = new File(_intermediateFilesDir, _column + 
".encodedVars");
+    _encodedVarFwdIndex =
+        new VarByteChunkForwardIndexWriterV5(_encodedVarFwdIndexFile, 
chunkCompressionType, _targetChunkSize);
+  }
+
+  public void putLogtypeDict(BytesOffHeapMutableDictionary logtypeDict)
+      throws IOException {
+    for (int i = 0; i < logtypeDict.length(); i++) {
+      _logtypeDict.add(logtypeDict.get(i));
+    }
+  }
+
+  public void putDictVarDict(BytesOffHeapMutableDictionary dictVarDict)
+      throws IOException {
+    for (int i = 0; i < dictVarDict.length(); i++) {
+      _dictVarDict.add(dictVarDict.get(i));
+    }
+  }
+
+  /**
+   * Appends a string message to the forward indexes.
+   * This path is only intended to be used for row-based ingestion and pays 
the high cost of encoding and decoding.
+   * For optimal mutable to immutable forward index conversion performance, 
use columnar ingestion which avoids the
+   * over serdes overhead. TODO: add the code in a separate PR to simplify 
review process
+   *
+   * @param value The string value to append
+   */
+  @Override
+  public void putString(String value) {
+    EncodedMessage encodedMessage = _clpEncodedMessage;
+    try {
+      _clpMessageEncoder.encodeMessage(value, encodedMessage);
+    } catch (IOException e) {
+      // Encode a fail-to-encode message if CLP encoding fails
+      encodedMessage = _failToEncodeClpEncodedMessage;
+    } finally {
+      appendEncodedMessage(encodedMessage);
+    }
+  }
+
+  /**
+   * Appends an encoded message to the forward indexes.
+   *
+   * @param clpEncodedMessage The encoded message to append, must not be null.
+   */
+  public void appendEncodedMessage(@NotNull EncodedMessage clpEncodedMessage) {
+    if (_isClpEncoded) {
+      // Logtype
+      
_logtypeIdFwdIndex.putInt(_mutableLogtypeDict.index(clpEncodedMessage.getLogtype()));
+
+      // DictVarIds
+      byte[][] dictVars = clpEncodedMessage.getDictionaryVarsAsByteArrays();
+      if (null == dictVars || 0 == dictVars.length) {
+        _dictVarIdFwdIndex.putIntMV(ArrayUtils.EMPTY_INT_ARRAY);
+      } else {
+        int[] dictVarIds = new int[dictVars.length];
+        for (int i = 0; i < dictVars.length; i++) {
+          dictVarIds[i] = _mutableDictVarDict.index(dictVars[i]);
+        }
+        _dictVarIdFwdIndex.putIntMV(dictVarIds);
+      }
+
+      // EncodedVars
+      long[] encodedVars = clpEncodedMessage.getEncodedVars();
+      if (null == encodedVars || 0 == encodedVars.length) {
+        _encodedVarFwdIndex.putLongMV(ArrayUtils.EMPTY_LONG_ARRAY);
+      } else {
+        _encodedVarFwdIndex.putLongMV(encodedVars);
+      }
+    } else {
+      _rawMsgFwdIndex.putBytes(clpEncodedMessage.getMessage());
+    }
+  }
+
+  /**
+   * Seals the forward index by finalizing and writing all the data to the 
underlying file storage. This method
+   * closes all intermediate files and writes the final forward index to the 
memory-mapped buffer.
+   */
+  @Override
+  public void seal() {
+    try {
+      // Close intermediate files
+      if (isClpEncoded()) {
+        try {
+          _logtypeDict.close();
+          _logtypeIdFwdIndex.close();
+          _dictVarDict.close();
+          _dictVarIdFwdIndex.close();
+          _encodedVarFwdIndex.close();
+        } catch (IOException e) {
+          throw new RuntimeException("Failed to close dictionaries and forward 
indexes for column: " + _column, e);
+        }
+      } else {
+        try {
+          _rawMsgFwdIndex.close();
+        } catch (IOException e) {
+          throw new RuntimeException("Failed to close raw message forward 
index for column: " + _column, e);
+        }
+      }
+
+      // Write intermediate files to memory mapped buffer
+      long totalSize = 0;
+      _fileBuffer.putInt(MAGIC_BYTES.length);
+      totalSize += Integer.BYTES;
+      _fileBuffer.put(MAGIC_BYTES);
+      totalSize += MAGIC_BYTES.length;
+
+      _fileBuffer.putInt(2); // version
+      totalSize += Integer.BYTES;
+
+      _fileBuffer.putInt(_isClpEncoded ? 1 : 0); // isClpEncoded
+      totalSize += Integer.BYTES;
+
+      if (_isClpEncoded) {
+        _fileBuffer.putInt(_logtypeDictSize);
+        totalSize += Integer.BYTES;
+
+        _fileBuffer.putInt(_dictVarDictSize);
+        totalSize += Integer.BYTES;
+
+        _fileBuffer.putInt((int) _logtypeDictFile.length());
+        totalSize += Integer.BYTES;
+
+        _fileBuffer.putInt((int) _dictVarDictFile.length());
+        totalSize += Integer.BYTES;
+
+        _fileBuffer.putInt((int) _logtypeIdFwdIndexFile.length());
+        totalSize += Integer.BYTES;
+
+        _fileBuffer.putInt((int) _dictVarIdFwdIndexFile.length());
+        totalSize += Integer.BYTES;
+
+        _fileBuffer.putInt((int) _encodedVarFwdIndexFile.length());
+        totalSize += Integer.BYTES;
+
+        copyFileIntoBuffer(_logtypeDictFile);
+        totalSize += _logtypeDictFile.length();
+
+        copyFileIntoBuffer(_dictVarDictFile);
+        totalSize += _dictVarDictFile.length();
+
+        copyFileIntoBuffer(_logtypeIdFwdIndexFile);
+        totalSize += _logtypeIdFwdIndexFile.length();
+
+        copyFileIntoBuffer(_dictVarIdFwdIndexFile);
+        totalSize += _dictVarIdFwdIndexFile.length();
+
+        copyFileIntoBuffer(_encodedVarFwdIndexFile);
+        totalSize += _encodedVarFwdIndexFile.length();
+      } else {
+        _fileBuffer.putInt((int) _rawMsgFwdIndexFile.length());
+        totalSize += Integer.BYTES;
+
+        copyFileIntoBuffer(_rawMsgFwdIndexFile);
+        totalSize += _rawMsgFwdIndexFile.length();
+      }
+
+      // Truncate memory mapped file to actual size
+      _dataFile.truncate(totalSize);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to seal forward indexes for column: " 
+ _column, e);
+    }
+  }
+
+  /**
+   * Closes the forward index creator, deleting all intermediate files and 
releasing any resources held by the class.
+   *
+   * @throws IOException If there is an error while closing the forward index 
or deleting the intermediate files.
+   */
+  @Override
+  public void close()
+      throws IOException {
+    // Delete all temp files
+    FileUtils.deleteDirectory(_intermediateFilesDir);
+    _dataFile.close();
+  }
+
+  @Override
+  public boolean isDictionaryEncoded() {
+    return false;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return true;
+  }
+
+  @Override
+  public FieldSpec.DataType getValueType() {
+    return FieldSpec.DataType.STRING;
+  }
+
+  /**
+   * Copies the contents of the given file into the memory-mapped buffer.
+   *
+   * @param file The file to be copied into the memory-mapped buffer.
+   * @throws IOException If there is an error while reading the file or 
writing to the buffer.
+   */
+  private void copyFileIntoBuffer(File file)
+      throws IOException {
+    try (FileChannel from = (FileChannel.open(file.toPath(), 
StandardOpenOption.READ))) {
+      _fileBuffer.put(from.map(FileChannel.MapMode.READ_ONLY, 0, 
file.length()));
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/CLPStatsProvider.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/CLPStatsProvider.java
index b861188620..c1776a61fb 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/CLPStatsProvider.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/CLPStatsProvider.java
@@ -18,10 +18,18 @@
  */
 package org.apache.pinot.segment.local.segment.creator.impl.stats;
 
+import 
org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndexV2;
+
+
 public interface CLPStatsProvider {
 
   CLPStats getCLPStats();
 
+  default CLPV2Stats getCLPV2Stats() {
+    throw new IllegalStateException(
+        "This method should only be implemented and used in 
MutableNoDictionaryColStatistics class.");
+  }
+
   class CLPStats {
     int _totalNumberOfDictVars = 0;
     int _totalNumberOfEncodedVars = 0;
@@ -63,4 +71,21 @@ public interface CLPStatsProvider {
       return _sortedDictVarValues;
     }
   }
+
+  /**
+   * CLPV2Stats maintains a reference to CLPMutableForwardIndexV2. In CLP V2 
forward indexes,
+   * to convert a mutable forward index to an immutable one, it tries to 
bypasses the need to decode
+   * and re-encode the CLP-encoded data.
+   */
+  class CLPV2Stats {
+    private CLPMutableForwardIndexV2 _clpMutableForwardIndexV2;
+
+    public CLPV2Stats(CLPMutableForwardIndexV2 clpMutableForwardIndexV2) {
+      _clpMutableForwardIndexV2 = clpMutableForwardIndexV2;
+    }
+
+    public CLPMutableForwardIndexV2 getClpMutableForwardIndexV2() {
+      return _clpMutableForwardIndexV2;
+    }
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
index 7d51a09a3b..87cb726222 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
@@ -22,6 +22,7 @@ package org.apache.pinot.segment.local.segment.index.forward;
 import java.io.File;
 import java.io.IOException;
 import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV1;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV2;
 import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueEntryDictForwardIndexCreator;
 import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
 import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator;
@@ -74,6 +75,9 @@ public class ForwardIndexCreatorFactory {
       if (indexConfig.getCompressionCodec() == 
FieldConfig.CompressionCodec.CLP) {
         return new CLPForwardIndexCreatorV1(indexDir, columnName, 
numTotalDocs, context.getColumnStatistics());
       }
+      if (indexConfig.getCompressionCodec() == 
FieldConfig.CompressionCodec.CLPV2) {
+        return new CLPForwardIndexCreatorV2(indexDir, 
context.getColumnStatistics());
+      }
       ChunkCompressionType chunkCompressionType = 
indexConfig.getChunkCompressionType();
       if (chunkCompressionType == null) {
         chunkCompressionType = 
ForwardIndexType.getDefaultCompressionType(fieldSpec.getFieldType());
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
index cc7201ed98..59a69047c0 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
@@ -23,7 +23,9 @@ import java.util.Arrays;
 import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
 import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV5;
 import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV1;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV2;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.CLPForwardIndexReaderV1;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.CLPForwardIndexReaderV2;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVEntryDictForwardIndexReader;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVForwardIndexReader;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2;
@@ -95,6 +97,13 @@ public class ForwardIndexReaderFactory extends 
IndexReaderFactory.Default<Forwar
           return new CLPForwardIndexReaderV1(dataBuffer, 
metadata.getTotalDocs());
         }
       }
+      if (dataBuffer.size() >= CLPForwardIndexCreatorV2.MAGIC_BYTES.length) {
+        byte[] magicBytes = new 
byte[CLPForwardIndexCreatorV2.MAGIC_BYTES.length];
+        dataBuffer.copyTo(0, magicBytes);
+        if (Arrays.equals(magicBytes, CLPForwardIndexCreatorV2.MAGIC_BYTES)) {
+          return new CLPForwardIndexReaderV2(dataBuffer, 
metadata.getTotalDocs());
+        }
+      }
       return createRawIndexReader(dataBuffer, 
metadata.getDataType().getStoredType(), metadata.isSingleValue());
     }
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
index 74b0b49c8b..03ed28b2f0 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
@@ -253,9 +253,13 @@ public class ForwardIndexType extends 
AbstractIndexType<ForwardIndexConfig, Forw
           if (config.getCompressionCodec() == CompressionCodec.CLP) {
             CLPMutableForwardIndexV2 clpMutableForwardIndex =
                 new CLPMutableForwardIndexV2(column, 
context.getMemoryManager());
-            // TODO: enable config to invoke forceClpDictionaryEncoding() 
on-demand
+            // CLP (V1) always have clp encoding enabled whereas V2 is dynamic
             clpMutableForwardIndex.forceClpEncoding();
             return clpMutableForwardIndex;
+          } else if (config.getCompressionCodec() == CompressionCodec.CLPV2) {
+            CLPMutableForwardIndexV2 clpMutableForwardIndex =
+                new CLPMutableForwardIndexV2(column, 
context.getMemoryManager());
+            return clpMutableForwardIndex;
           }
           return new VarByteSVMutableForwardIndex(storedType, 
context.getMemoryManager(), allocationContext,
               initialCapacity, 
NODICT_VARIABLE_WIDTH_ESTIMATED_AVERAGE_VALUE_LENGTH_DEFAULT);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV2.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV2.java
new file mode 100644
index 0000000000..63314408de
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV2.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.forward;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.FlattenedByteArrayFactory;
+import com.yscope.clp.compressorfrontend.MessageDecoder;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
+import org.apache.pinot.segment.local.io.util.VarLengthValueReader;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV2;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * {@code CLPForwardIndexReaderV2} is a forward index reader for CLP-encoded 
forward indexes. It supports reading both
+ * CLP-encoded and raw message forward indexes created by {@link 
CLPForwardIndexCreatorV2}.
+ *
+ * <p>This class supports two modes of reading:
+ * <ul>
+ *   <li>**CLP-encoded forward index**: Reads compressed log messages that are 
stored using a combination of logtype
+ *   dictionaries, dictionary variables, and encoded variables.</li>
+ *   <li>**Raw message forward index**: Reads raw log messages stored as byte 
arrays without any CLP encoding.</li>
+ * </ul>
+ *
+ * The constructor of this class reads and validates the forward index from a 
{@link PinotDataBuffer}, and based on the
+ * metadata, it initializes the appropriate readers for either CLP-encoded or 
raw messages.
+ *
+ * @see CLPForwardIndexCreatorV2
+ */
+public class CLPForwardIndexReaderV2 implements 
ForwardIndexReader<CLPForwardIndexReaderV2.CLPReaderContext> {
+  private final int _version;
+  private final int _numDocs;
+  private final boolean _isClpEncoded;
+
+  private VarLengthValueReader _logTypeDictReader;
+  private VarLengthValueReader _dictVarDictReader;
+  private int _logtypeDictNumBytesPerValue;
+  private int _dictVarDictNumBytesPerValue;
+
+  private FixedBytePower2ChunkSVForwardIndexReader _logTypeIdFwdIndexReader;
+  private VarByteChunkForwardIndexReaderV5 _dictVarIdFwdIndexReader;
+  private VarByteChunkForwardIndexReaderV5 _encodedVarFwdIndexReader;
+  private VarByteChunkForwardIndexReaderV5 _rawMsgFwdIndexReader;
+  private MessageDecoder _clpMessageDecoder;
+
+  /**
+   * Constructs a {@code CLPForwardIndexReaderV2} for reading the forward 
index from the given {@link PinotDataBuffer}.
+   *
+   * <p>This constructor reads the metadata from the data buffer and 
initializes the appropriate readers for either
+   * CLP-encoded or raw message forward indexes.</p>
+   *
+   * @param pinotDataBuffer The data buffer containing the forward index.
+   * @param numDocs The number of documents in the forward index.
+   * @throws UnsupportedOperationException If the magic bytes do not match the 
expected CLP forward index format.
+   */
+  public CLPForwardIndexReaderV2(PinotDataBuffer pinotDataBuffer, int numDocs) 
{
+    _numDocs = numDocs;
+    int offset = 0;
+    int magicBytesLength = pinotDataBuffer.getInt(offset);
+    offset += Integer.BYTES;
+    byte[] magicBytes = new byte[magicBytesLength];
+    pinotDataBuffer.copyTo(offset, magicBytes);
+
+    // Validate against supported version
+    if (!Arrays.equals(magicBytes, CLPForwardIndexCreatorV2.MAGIC_BYTES)) {
+      throw new UnsupportedOperationException("Unsupported magic bytes");
+    }
+    offset += CLPForwardIndexCreatorV2.MAGIC_BYTES.length;
+
+    _version = pinotDataBuffer.getInt(offset);
+    offset += Integer.BYTES;
+
+    _isClpEncoded = pinotDataBuffer.getInt(offset) == 1;   // 1 -> true, 0 -> 
false
+    offset += Integer.BYTES;
+
+    if (_isClpEncoded) {
+      int logtypeDictSize = pinotDataBuffer.getInt(offset);
+      _logtypeDictNumBytesPerValue = 
PinotDataBitSet.getNumBitsPerValue(logtypeDictSize - 1);
+      offset += Integer.BYTES;
+
+      int dictVarDictSize = pinotDataBuffer.getInt(offset);
+      _dictVarDictNumBytesPerValue = 
PinotDataBitSet.getNumBitsPerValue(dictVarDictSize - 1);
+      offset += Integer.BYTES;
+
+      int logtypeDictLength = pinotDataBuffer.getInt(offset);
+      offset += Integer.BYTES;
+      int dictVarDictLength = pinotDataBuffer.getInt(offset);
+      offset += Integer.BYTES;
+      int logtypeIdFwdIndexLength = pinotDataBuffer.getInt(offset);
+      offset += Integer.BYTES;
+      int dictVarIdFwdIndexLength = pinotDataBuffer.getInt(offset);
+      offset += Integer.BYTES;
+      int encodedVarFwdIndexLength = pinotDataBuffer.getInt(offset);
+      offset += Integer.BYTES;
+
+      _logTypeDictReader = new 
VarLengthValueReader(pinotDataBuffer.view(offset, offset + logtypeDictLength));
+      offset += logtypeDictLength;
+
+      _dictVarDictReader = new 
VarLengthValueReader(pinotDataBuffer.view(offset, offset + dictVarDictLength));
+      offset += dictVarDictLength;
+
+      _logTypeIdFwdIndexReader =
+          new 
FixedBytePower2ChunkSVForwardIndexReader(pinotDataBuffer.view(offset, offset + 
logtypeIdFwdIndexLength),
+              FieldSpec.DataType.INT);
+      offset += logtypeIdFwdIndexLength;
+
+      _dictVarIdFwdIndexReader =
+          new VarByteChunkForwardIndexReaderV5(pinotDataBuffer.view(offset, 
offset + dictVarIdFwdIndexLength),
+              FieldSpec.DataType.INT, false);
+      offset += dictVarIdFwdIndexLength;
+
+      _encodedVarFwdIndexReader =
+          new VarByteChunkForwardIndexReaderV5(pinotDataBuffer.view(offset, 
offset + encodedVarFwdIndexLength),
+              FieldSpec.DataType.LONG, false);
+      offset += encodedVarFwdIndexLength;
+
+      _clpMessageDecoder = new 
MessageDecoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+          BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+    } else {
+      int rawMsgFwdIndexLength = pinotDataBuffer.getInt(offset);
+      offset += Integer.BYTES;
+
+      _rawMsgFwdIndexReader =
+          new VarByteChunkForwardIndexReaderV5(pinotDataBuffer.view(offset, 
offset + rawMsgFwdIndexLength),
+              FieldSpec.DataType.BYTES, false);
+      offset += rawMsgFwdIndexLength;
+    }
+  }
+
+  /**
+   * Creates a new {@code CLPReaderContext} for reading data from the forward 
index.
+   *
+   * @return A new {@code CLPReaderContext} initialized with the appropriate 
reader contexts for the forward index.
+   */
+  public CLPForwardIndexReaderV2.CLPReaderContext createContext() {
+    if (_isClpEncoded) {
+      return new CLPReaderContext(_logTypeIdFwdIndexReader.createContext(), 
_dictVarIdFwdIndexReader.createContext(),
+          _encodedVarFwdIndexReader.createContext());
+    } else {
+      return new CLPReaderContext(_rawMsgFwdIndexReader.createContext());
+    }
+  }
+
+  @Override
+  public boolean isDictionaryEncoded() {
+    return false;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return true;
+  }
+
+  @Override
+  public FieldSpec.DataType getStoredType() {
+    return FieldSpec.DataType.STRING;
+  }
+
+  @Override
+  public String getString(int docId, CLPReaderContext context) {
+    if (_isClpEncoded) {
+      byte[] logtype =
+          _logTypeDictReader.getBytes(_logTypeIdFwdIndexReader.getInt(docId, 
context._logTypeIdReaderContext),
+              _logtypeDictNumBytesPerValue);
+
+      int[] dictVarIds = _dictVarIdFwdIndexReader.getIntMV(docId, 
context._dictVarIdReaderContext);
+      byte[][] dictVars = new byte[dictVarIds.length][];
+      for (int i = 0; i < dictVars.length; i++) {
+        dictVars[i] = _dictVarDictReader.getBytes(dictVarIds[i], 
_dictVarDictNumBytesPerValue);
+      }
+
+      long[] encodedVars = _encodedVarFwdIndexReader.getLongMV(docId, 
context._encodedVarReaderContext);
+      try {
+        return _clpMessageDecoder.decodeMessage(logtype, 
FlattenedByteArrayFactory.fromByteArrays(dictVars),
+            encodedVars);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    } else {
+      byte[] rawMsg = _rawMsgFwdIndexReader.getBytes(docId, 
context._rawMsgReaderContext);
+      return new String(rawMsg, StandardCharsets.UTF_8);
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+  }
+
+  /**
+   * The {@code CLPReaderContext} is a context class used to hold 
reader-specific state during forward index reading.
+   * It contains references to reader contexts for logtype IDs, dictionary 
variable IDs, encoded variables, or raw
+   * messages.
+   */
+  public static final class CLPReaderContext implements 
ForwardIndexReaderContext {
+    private final ChunkReaderContext _logTypeIdReaderContext;
+    private final VarByteChunkForwardIndexReaderV5.ReaderContext 
_dictVarIdReaderContext;
+    private final VarByteChunkForwardIndexReaderV5.ReaderContext 
_encodedVarReaderContext;
+    private final VarByteChunkForwardIndexReaderV4.ReaderContext 
_rawMsgReaderContext;
+
+    public CLPReaderContext(ChunkReaderContext logTypeIdReaderContext,
+        VarByteChunkForwardIndexReaderV5.ReaderContext dictVarIdReaderContext,
+        VarByteChunkForwardIndexReaderV5.ReaderContext 
encodedVarReaderContext) {
+      this(logTypeIdReaderContext, dictVarIdReaderContext, 
encodedVarReaderContext, null);
+    }
+
+    public CLPReaderContext(VarByteChunkForwardIndexReaderV4.ReaderContext 
rawMsgReaderContext) {
+      this(null, null, null, rawMsgReaderContext);
+    }
+
+    public CLPReaderContext(ChunkReaderContext logTypeIdReaderContext,
+        VarByteChunkForwardIndexReaderV5.ReaderContext dictVarIdReaderContext,
+        VarByteChunkForwardIndexReaderV5.ReaderContext encodedVarReaderContext,
+        VarByteChunkForwardIndexReaderV4.ReaderContext rawMsgReaderContext) {
+      _logTypeIdReaderContext = logTypeIdReaderContext;
+      _dictVarIdReaderContext = dictVarIdReaderContext;
+      _encodedVarReaderContext = encodedVarReaderContext;
+      _rawMsgReaderContext = rawMsgReaderContext;
+    }
+
+    @Override
+    public void close()
+        throws IOException {
+      if (null != _logTypeIdReaderContext) {
+        _logTypeIdReaderContext.close();
+      }
+      if (null != _dictVarIdReaderContext) {
+        _dictVarIdReaderContext.close();
+      }
+      if (null != _encodedVarReaderContext) {
+        _encodedVarReaderContext.close();
+      }
+      if (null != _rawMsgReaderContext) {
+        _rawMsgReaderContext.close();
+      }
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 722fb3e4b9..90bae24cfa 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -1204,9 +1204,11 @@ public final class TableConfigUtils {
       switch (encodingType) {
         case RAW:
           Preconditions.checkArgument(compressionCodec == null || 
compressionCodec.isApplicableToRawIndex()
-                  || compressionCodec == CompressionCodec.CLP, "Compression 
codec: %s is not applicable to raw index",
+                  || compressionCodec == CompressionCodec.CLP || 
compressionCodec == CompressionCodec.CLPV2,
+              "Compression codec: %s is not applicable to raw index",
               compressionCodec);
-          if (compressionCodec == CompressionCodec.CLP && schema != null) {
+          if ((compressionCodec == CompressionCodec.CLP || compressionCodec == 
CompressionCodec.CLPV2)
+              && schema != null) {
             Preconditions.checkArgument(
                 
schema.getFieldSpecFor(columnName).getDataType().getStoredType() == 
DataType.STRING,
                 "CLP compression codec can only be applied to string columns");
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorV2Test.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorV2Test.java
new file mode 100644
index 0000000000..c66ea2f3ae
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorV2Test.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.creator;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
+import 
org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndexV2;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV2;
+import 
org.apache.pinot.segment.local.segment.index.forward.mutable.VarByteSVMutableForwardIndexTest;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.CLPForwardIndexReaderV2;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class CLPForwardIndexCreatorV2Test {
+  private static final File TEMP_DIR =
+      new File(FileUtils.getTempDirectory(), 
CLPForwardIndexCreatorV2Test.class.getSimpleName());
+  private PinotDataBufferMemoryManager _memoryManager;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
+    _memoryManager = new 
DirectMemoryManager(VarByteSVMutableForwardIndexTest.class.getName());
+  }
+
+  @Test
+  public void testCLPWriter()
+      throws IOException {
+    List<String> logLines = new ArrayList<>();
+    logLines.add("INFO [PropertyCache] 
[HelixController-pipeline-default-pinot-(4a02a32c_DEFAULT)] "
+        + "Event pinot::DEFAULT::4a02a32c_DEFAULT : Refreshed 35 property 
LiveInstance took 5 ms. Selective: true");
+    logLines.add("INFO [PropertyCache] 
[HelixController-pipeline-default-pinot-(4a02a32d_DEFAULT)] "
+        + "Event pinot::DEFAULT::4a02a32d_DEFAULT : Refreshed 81 property 
LiveInstance took 4 ms. Selective: true");
+    logLines.add("INFO [ControllerResponseFilter] [grizzly-http-server-2] 
Handled request from 0.0"
+        + ".0.0 GET https://0.0.0.0:8443/health?checkType=liveness, 
content-type null status code 200 OK");
+    logLines.add("INFO [ControllerResponseFilter] [grizzly-http-server-6] 
Handled request from 0.0"
+        + ".0.0 GET 
https://pinot-pinot-broker-headless.managed.svc.cluster.local:8093/tables, 
content-type "
+        + "application/json status code 200 OK");
+    logLines.add("null");
+
+    // Create and ingest into a clp mutable forward indexes
+    CLPMutableForwardIndexV2 clpMutableForwardIndexV2 = new 
CLPMutableForwardIndexV2("column1", _memoryManager);
+    for (int i = 0; i < logLines.size(); i++) {
+      clpMutableForwardIndexV2.setString(i, logLines.get(i));
+    }
+
+    // Create a immutable forward index from mutable forward index
+    CLPForwardIndexCreatorV2 clpForwardIndexCreatorV2 =
+        new CLPForwardIndexCreatorV2(TEMP_DIR, clpMutableForwardIndexV2, 
ChunkCompressionType.ZSTANDARD);
+    for (int i = 0; i < logLines.size(); i++) {
+      
clpForwardIndexCreatorV2.putString(clpMutableForwardIndexV2.getString(i));
+    }
+    clpForwardIndexCreatorV2.seal();
+    clpForwardIndexCreatorV2.close();
+
+    // Read from immutable forward index and validate the content
+    File indexFile = new File(TEMP_DIR, "column1" + 
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
+    PinotDataBuffer pinotDataBuffer = 
PinotDataBuffer.mapReadOnlyBigEndianFile(indexFile);
+    CLPForwardIndexReaderV2 clpForwardIndexReaderV2 = new 
CLPForwardIndexReaderV2(pinotDataBuffer, logLines.size());
+    CLPForwardIndexReaderV2.CLPReaderContext clpForwardIndexReaderV2Context = 
clpForwardIndexReaderV2.createContext();
+    for (int i = 0; i < logLines.size(); i++) {
+      Assert.assertEquals(clpForwardIndexReaderV2.getString(i, 
clpForwardIndexReaderV2Context), logLines.get(i));
+    }
+  }
+}
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
index 1e713bec02..89b5a95d4f 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
@@ -75,6 +75,7 @@ public class ForwardIndexConfig extends IndexConfig {
       switch (compressionCodec) {
         case PASS_THROUGH:
         case CLP:
+        case CLPV2:
           _chunkCompressionType = ChunkCompressionType.PASS_THROUGH;
           _dictIdCompressionType = null;
           break;
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
index a88f1ffb7c..3a5eaf775a 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
@@ -142,8 +142,9 @@ public class FieldConfig extends BaseJsonConfig {
     MV_ENTRY_DICT(false, true),
 
     // CLP is a special type of compression codec that isn't generally 
applicable to all RAW columns and has a special
-    // handling for log lines (see {@link CLPForwardIndexCreatorV1})
-    CLP(false, false);
+    // handling for log lines (see {@link CLPForwardIndexCreatorV1} and {@link 
CLPForwardIndexCreatorV2)
+    CLP(false, false),
+    CLPV2(false, false);
     //@formatter:on
 
     private final boolean _applicableToRawIndex;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to