This is an automated email from the ASF dual-hosted git repository.

lqc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b787ad43a1 Add CLPV2_ZSTD and CLPV2_LZ4 raw forward index compression 
codecs. (#14661)
b787ad43a1 is described below

commit b787ad43a173500c81bce779233cc7fe85aa8cd2
Author: Jack Luo <jack....@mail.utoronto.ca>
AuthorDate: Tue Jan 14 03:41:43 2025 +0800

    Add CLPV2_ZSTD and CLPV2_LZ4 raw forward index compression codecs. (#14661)
---
 .../creator/impl/fwd/CLPForwardIndexCreatorV2.java | 25 +++++++++++++++++-----
 .../index/forward/ForwardIndexCreatorFactory.java  |  8 +++++++
 .../segment/index/forward/ForwardIndexType.java    |  4 +++-
 .../segment/local/utils/TableConfigUtils.java      |  6 ++++--
 .../creator/CLPForwardIndexCreatorV2Test.java      | 12 +++++------
 .../segment/spi/index/ForwardIndexConfig.java      |  2 ++
 .../apache/pinot/spi/config/table/FieldConfig.java |  5 ++++-
 7 files changed, 47 insertions(+), 15 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV2.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV2.java
index 2a762d481d..539acd26b1 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV2.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV2.java
@@ -129,9 +129,10 @@ public class CLPForwardIndexCreatorV2 implements 
ForwardIndexCreator {
   private final ChunkCompressionType _chunkCompressionType;
 
   /**
-   * Initializes a forward index creator for the given column using the 
provided base directory and column statistics.
-   * This constructor is specifically used by {@code 
ForwardIndexCreatorFactory}. Unlike other immutable forward index
-   * constructors, this one handles the entire process of converting a mutable 
forward index into an immutable one.
+   * Initializes a forward index creator for the given column using the 
provided base directory, column statistics and
+   * chunk compressor type. This constructor is specifically used by {@code 
ForwardIndexCreatorFactory}. Unlike other
+   * immutable forward index constructors, this one handles the entire process 
of converting a mutable forward index
+   * into an immutable one.
    *
    * <p>The {@code columnStatistics} object passed into this constructor 
should contain a reference to the mutable
    * forward index ({@link CLPMutableForwardIndexV2}). The data from the 
mutable index is efficiently copied over
@@ -142,12 +143,26 @@ public class CLPForwardIndexCreatorV2 implements 
ForwardIndexCreator {
    * @param baseIndexDir The base directory where the forward index files will 
be stored.
    * @param columnStatistics The column statistics containing the CLP forward 
index information, including a reference
    *        to the mutable forward index.
+   * @param chunkCompressionType The chunk compressor type used to compress 
internal data columns
    * @throws IOException If there is an error during initialization or while 
accessing the file system.
    */
-  public CLPForwardIndexCreatorV2(File baseIndexDir, ColumnStatistics 
columnStatistics)
+  public CLPForwardIndexCreatorV2(File baseIndexDir, ColumnStatistics 
columnStatistics,
+      ChunkCompressionType chunkCompressionType)
       throws IOException {
     this(baseIndexDir, ((CLPStatsProvider) 
columnStatistics).getCLPV2Stats().getClpMutableForwardIndexV2(),
-        ChunkCompressionType.ZSTANDARD);
+        chunkCompressionType);
+  }
+
+  /**
+   * Same as above, except with chunk compressor set to ZStandard by default
+   * @param baseIndexDir The base directory where the forward index files will 
be stored.
+   * @param columnStatistics The column statistics containing the CLP forward 
index information, including a reference
+   *        to the mutable forward index.
+   * @throws IOException If there is an error during initialization or while 
accessing the file system.
+   */
+  public CLPForwardIndexCreatorV2(File baseIndexDir, ColumnStatistics 
columnStatistics)
+      throws IOException {
+    this(baseIndexDir, columnStatistics, ChunkCompressionType.ZSTANDARD);
   }
 
   /**
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
index 87cb726222..6084c77b4e 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
@@ -73,11 +73,19 @@ public class ForwardIndexCreatorFactory {
       // Dictionary disabled columns
       DataType storedType = fieldSpec.getDataType().getStoredType();
       if (indexConfig.getCompressionCodec() == 
FieldConfig.CompressionCodec.CLP) {
+        // CLP (V1) uses hard-coded chunk compressor which is set to 
`PassThrough`
         return new CLPForwardIndexCreatorV1(indexDir, columnName, 
numTotalDocs, context.getColumnStatistics());
       }
       if (indexConfig.getCompressionCodec() == 
FieldConfig.CompressionCodec.CLPV2) {
+        // Use the default chunk compression codec for CLP, currently 
configured to use ZStandard
         return new CLPForwardIndexCreatorV2(indexDir, 
context.getColumnStatistics());
       }
+      if (indexConfig.getCompressionCodec() == 
FieldConfig.CompressionCodec.CLPV2_ZSTD) {
+        return new CLPForwardIndexCreatorV2(indexDir, 
context.getColumnStatistics(), ChunkCompressionType.ZSTANDARD);
+      }
+      if (indexConfig.getCompressionCodec() == 
FieldConfig.CompressionCodec.CLPV2_LZ4) {
+        return new CLPForwardIndexCreatorV2(indexDir, 
context.getColumnStatistics(), ChunkCompressionType.LZ4);
+      }
       ChunkCompressionType chunkCompressionType = 
indexConfig.getChunkCompressionType();
       if (chunkCompressionType == null) {
         chunkCompressionType = 
ForwardIndexType.getDefaultCompressionType(fieldSpec.getFieldType());
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
index c9b49bbc36..c23dac3f91 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
@@ -256,7 +256,9 @@ public class ForwardIndexType extends 
AbstractIndexType<ForwardIndexConfig, Forw
             // CLP (V1) always have clp encoding enabled whereas V2 is dynamic
             clpMutableForwardIndex.forceClpEncoding();
             return clpMutableForwardIndex;
-          } else if (config.getCompressionCodec() == CompressionCodec.CLPV2) {
+          } else if (config.getCompressionCodec() == CompressionCodec.CLPV2
+              || config.getCompressionCodec() == CompressionCodec.CLPV2_ZSTD
+              || config.getCompressionCodec() == CompressionCodec.CLPV2_LZ4) {
             CLPMutableForwardIndexV2 clpMutableForwardIndex =
                 new CLPMutableForwardIndexV2(column, 
context.getMemoryManager());
             return clpMutableForwardIndex;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index ad792016c3..ddab356085 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -1207,10 +1207,12 @@ public final class TableConfigUtils {
       switch (encodingType) {
         case RAW:
           Preconditions.checkArgument(compressionCodec == null || 
compressionCodec.isApplicableToRawIndex()
-                  || compressionCodec == CompressionCodec.CLP || 
compressionCodec == CompressionCodec.CLPV2,
+                  || compressionCodec == CompressionCodec.CLP || 
compressionCodec == CompressionCodec.CLPV2
+                  || compressionCodec == CompressionCodec.CLPV2_ZSTD || 
compressionCodec == CompressionCodec.CLPV2_LZ4,
               "Compression codec: %s is not applicable to raw index",
               compressionCodec);
-          if ((compressionCodec == CompressionCodec.CLP || compressionCodec == 
CompressionCodec.CLPV2)
+          if ((compressionCodec == CompressionCodec.CLP || compressionCodec == 
CompressionCodec.CLPV2
+              || compressionCodec == CompressionCodec.CLPV2_ZSTD || 
compressionCodec == CompressionCodec.CLPV2_LZ4)
               && schema != null) {
             Preconditions.checkArgument(
                 
schema.getFieldSpecFor(columnName).getDataType().getStoredType() == 
DataType.STRING,
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorV2Test.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorV2Test.java
index 32732e4cad..65152152e4 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorV2Test.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorV2Test.java
@@ -114,12 +114,12 @@ public class CLPForwardIndexCreatorV2Test {
     Assert.assertTrue((float) rawStringFwdIndexSizeZSTD / clpFwdIndexSizeZSTD 
>= 0.19);
   }
 
-  private long createStringRawForwardIndex(ChunkCompressionType 
compressionType, int maxLength)
+  private long createStringRawForwardIndex(ChunkCompressionType 
chunkCompressionType, int maxLength)
       throws IOException {
     // Create a raw string immutable forward index
     TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
     SingleValueVarByteRawIndexCreator index =
-        new SingleValueVarByteRawIndexCreator(TEMP_DIR, compressionType, 
COLUMN_NAME, _logMessages.size(),
+        new SingleValueVarByteRawIndexCreator(TEMP_DIR, chunkCompressionType, 
COLUMN_NAME, _logMessages.size(),
             FieldSpec.DataType.STRING, maxLength);
     for (String logMessage : _logMessages) {
       index.putString(logMessage);
@@ -132,9 +132,9 @@ public class CLPForwardIndexCreatorV2Test {
   }
 
   private long 
createAndValidateClpImmutableForwardIndex(CLPMutableForwardIndexV2 
clpMutableForwardIndexV2,
-      ChunkCompressionType compressionType)
+      ChunkCompressionType chunkCompressionType)
       throws IOException {
-    long indexSize = createClpImmutableForwardIndex(clpMutableForwardIndexV2, 
compressionType);
+    long indexSize = createClpImmutableForwardIndex(clpMutableForwardIndexV2, 
chunkCompressionType);
 
     // Read from immutable forward index and validate the content
     File indexFile = new File(TEMP_DIR, COLUMN_NAME + 
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
@@ -149,12 +149,12 @@ public class CLPForwardIndexCreatorV2Test {
   }
 
   private long createClpImmutableForwardIndex(CLPMutableForwardIndexV2 
clpMutableForwardIndexV2,
-      ChunkCompressionType compressionType)
+      ChunkCompressionType chunkCompressionType)
       throws IOException {
     // Create a CLP immutable forward index from mutable forward index
     TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
     CLPForwardIndexCreatorV2 clpForwardIndexCreatorV2 =
-        new CLPForwardIndexCreatorV2(TEMP_DIR, clpMutableForwardIndexV2, 
compressionType);
+        new CLPForwardIndexCreatorV2(TEMP_DIR, clpMutableForwardIndexV2, 
chunkCompressionType);
     for (int i = 0; i < _logMessages.size(); i++) {
       
clpForwardIndexCreatorV2.putString(clpMutableForwardIndexV2.getString(i));
     }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
index fe2cfbbd2e..b2a794ac2a 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
@@ -116,6 +116,8 @@ public class ForwardIndexConfig extends IndexConfig {
         case PASS_THROUGH:
         case CLP:
         case CLPV2:
+        case CLPV2_ZSTD:
+        case CLPV2_LZ4:
           _chunkCompressionType = ChunkCompressionType.PASS_THROUGH;
           _dictIdCompressionType = null;
           break;
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
index 3a5eaf775a..cf02527deb 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
@@ -144,7 +144,10 @@ public class FieldConfig extends BaseJsonConfig {
     // CLP is a special type of compression codec that isn't generally 
applicable to all RAW columns and has a special
     // handling for log lines (see {@link CLPForwardIndexCreatorV1} and {@link 
CLPForwardIndexCreatorV2)
     CLP(false, false),
-    CLPV2(false, false);
+    CLPV2(false, false),
+    CLPV2_ZSTD(false, false),
+    CLPV2_LZ4(false, false);
+
     //@formatter:on
 
     private final boolean _applicableToRawIndex;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to