This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 38fb0b5f8a Add the possibility of configuring ForwardIndexes with compressionCodec (#12218) 38fb0b5f8a is described below commit 38fb0b5f8a968e0e11f45b6173b931b9e6899ee4 Author: Gonzalo Ortiz Jaureguizar <gor...@users.noreply.github.com> AuthorDate: Thu Feb 22 07:56:45 2024 +0100 Add the possibility of configuring ForwardIndexes with compressionCodec (#12218) --- .../index/forward/ForwardIndexTypeTest.java | 118 ++++++++++++-- .../segment/spi/index/ForwardIndexConfig.java | 172 +++++++++++++++------ 2 files changed, 228 insertions(+), 62 deletions(-) diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java index c72a95a4e9..25f6380dcb 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java @@ -22,6 +22,7 @@ package org.apache.pinot.segment.local.segment.index.forward; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import java.io.IOException; +import java.util.Arrays; import java.util.Map; import java.util.stream.Collectors; import org.apache.pinot.segment.local.segment.index.AbstractSerdeIndexContract; @@ -42,17 +43,42 @@ import static org.testng.Assert.assertSame; public class ForwardIndexTypeTest { - @DataProvider(name = "allChunkCompressionType") - public static Object[][] allChunkCompressionType() { - return new String[][] { - new String[] {"PASS_THROUGH"}, - new String[] {"SNAPPY"}, - new String[] {"ZSTANDARD"}, - new String[] {"LZ4"}, - new String[] {null} + @DataProvider(name = "allCompressionCodec") + public static Object[][] allCompressionCodec() { + return new Object[][] { + new Object[] {"PASS_THROUGH", ChunkCompressionType.PASS_THROUGH, null}, + new Object[] {"SNAPPY", ChunkCompressionType.SNAPPY, null}, + new Object[] {"ZSTANDARD", ChunkCompressionType.ZSTANDARD, null}, + new Object[] {"LZ4", ChunkCompressionType.LZ4, null}, + new Object[] {"MV_ENTRY_DICT", null, DictIdCompressionType.MV_ENTRY_DICT}, + new Object[] {null, null, null} }; } + @DataProvider(name = "allChunkCompression") + public static Object[][] allChuckCompression() { + return Arrays.stream(allCompressionCodec()) + .filter(values -> { + Object compression = values[0]; + FieldConfig.CompressionCodec compressionCodec = compression == null ? null + : FieldConfig.CompressionCodec.valueOf(compression.toString()); + return compressionCodec == null || compressionCodec.isApplicableToRawIndex(); + }) + .toArray(Object[][]::new); + } + + @DataProvider(name = "allDictCompression") + public static Object[][] allDictCompression() { + return Arrays.stream(allCompressionCodec()) + .filter(values -> { + Object compression = values[0]; + FieldConfig.CompressionCodec compressionCodec = compression == null ? null + : FieldConfig.CompressionCodec.valueOf(compression.toString()); + return compressionCodec == null || compressionCodec.isApplicableToDictEncodedIndex(); + }) + .toArray(Object[][]::new); + } + public static class ConfTest extends AbstractSerdeIndexContract { protected void assertEquals(ForwardIndexConfig expected) { @@ -189,8 +215,9 @@ public class ForwardIndexTypeTest { assertEquals(ForwardIndexConfig.DEFAULT); } - @Test(dataProvider = "allChunkCompressionType", dataProviderClass = ForwardIndexTypeTest.class) - public void oldConfEnableRawWithCompression(String compression) + @Test(dataProvider = "allCompressionCodec", dataProviderClass = ForwardIndexTypeTest.class) + public void oldConfEnableRawWithCompression(String compression, + ChunkCompressionType expectedChunkCompression, DictIdCompressionType expectedDictCompression) throws IOException { String valueJson = compression == null ? "null" : "\"" + compression + "\""; @@ -204,7 +231,9 @@ public class ForwardIndexTypeTest { assertEquals( new ForwardIndexConfig.Builder() - .withCompressionType(compression == null ? null : ChunkCompressionType.valueOf(compression)) + .withCompressionCodec(compression == null ? null : FieldConfig.CompressionCodec.valueOf(compression)) + .withCompressionType(expectedChunkCompression) + .withDictIdCompressionType(expectedDictCompression) .withDeriveNumDocsPerChunk(false) .withRawIndexWriterVersion(ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION) .build() @@ -296,8 +325,9 @@ public class ForwardIndexTypeTest { new ForwardIndexConfig.Builder().withDictIdCompressionType(DictIdCompressionType.MV_ENTRY_DICT).build()); } - @Test(dataProvider = "allChunkCompressionType", dataProviderClass = ForwardIndexTypeTest.class) - public void newConfigEnabled(String compression) + @Test(dataProvider = "allChunkCompression", dataProviderClass = ForwardIndexTypeTest.class) + public void newConfigEnabledWithChunkCompression(String compression, + ChunkCompressionType expectedChunkCompression, DictIdCompressionType expectedDictCompression) throws IOException { String valueJson = compression == null ? "null" : "\"" + compression + "\""; addFieldIndexConfig("" @@ -315,7 +345,67 @@ public class ForwardIndexTypeTest { assertEquals( new ForwardIndexConfig.Builder() - .withCompressionType(compression == null ? null : ChunkCompressionType.valueOf(compression)) + .withCompressionType(expectedChunkCompression) + .withDictIdCompressionType(expectedDictCompression) + .withDeriveNumDocsPerChunk(true) + .withRawIndexWriterVersion(10) + .build() + ); + } + + @Test(dataProvider = "allDictCompression", dataProviderClass = ForwardIndexTypeTest.class) + public void newConfigEnabledWithDictCompression(String compression, + ChunkCompressionType expectedChunkCompression, DictIdCompressionType expectedDictCompression) + throws IOException { + String valueJson = compression == null ? "null" : "\"" + compression + "\""; + addFieldIndexConfig("" + + " {\n" + + " \"name\": \"dimInt\"," + + " \"indexes\" : {" + + " \"forward\": {" + + " \"dictIdCompressionType\": " + valueJson + ",\n" + + " \"deriveNumDocsPerChunk\": true,\n" + + " \"rawIndexWriterVersion\": 10\n" + + " }" + + " }\n" + + " }" + ); + + assertEquals( + new ForwardIndexConfig.Builder() + .withCompressionCodec(compression == null ? null : FieldConfig.CompressionCodec.valueOf(compression)) + .withCompressionType(expectedChunkCompression) + .withDictIdCompressionType(expectedDictCompression) + .withDeriveNumDocsPerChunk(true) + .withRawIndexWriterVersion(10) + .build() + ); + } + @Test(dataProvider = "allCompressionCodec", dataProviderClass = ForwardIndexTypeTest.class) + public void newConfigEnabledWithCompressionCodec(String compression, + ChunkCompressionType expectedChunkCompression, DictIdCompressionType expectedDictCompression) + throws IOException { + FieldConfig.CompressionCodec compressionCodec = compression == null ? null + : FieldConfig.CompressionCodec.valueOf(compression); + + String valueJson = compression == null ? "null" : "\"" + compression + "\""; + addFieldIndexConfig("" + + " {\n" + + " \"name\": \"dimInt\"," + + " \"indexes\" : {" + + " \"forward\": {" + + " \"compressionCodec\": " + valueJson + ",\n" + + " \"deriveNumDocsPerChunk\": true,\n" + + " \"rawIndexWriterVersion\": 10\n" + + " }" + + " }\n" + + " }" + ); + + assertEquals( + new ForwardIndexConfig.Builder() + .withCompressionType(expectedChunkCompression) + .withDictIdCompressionType(expectedDictCompression) .withDeriveNumDocsPerChunk(true) .withRawIndexWriterVersion(10) .build() diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java index fcdbbe4fe0..5db086a13b 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java @@ -20,7 +20,9 @@ package org.apache.pinot.segment.spi.index; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; import java.util.Map; import java.util.Objects; import javax.annotation.Nullable; @@ -33,33 +35,104 @@ import org.apache.pinot.spi.config.table.IndexConfig; public class ForwardIndexConfig extends IndexConfig { public static final int DEFAULT_RAW_WRITER_VERSION = 2; - public static final ForwardIndexConfig DISABLED = new ForwardIndexConfig(true, null, null, null, null); + public static final ForwardIndexConfig DISABLED = new ForwardIndexConfig(true, null, null, null, null, null); public static final ForwardIndexConfig DEFAULT = new Builder().build(); @Nullable - private final ChunkCompressionType _chunkCompressionType; + private final CompressionCodec _compressionCodec; private final boolean _deriveNumDocsPerChunk; private final int _rawIndexWriterVersion; + @Nullable + private final ChunkCompressionType _chunkCompressionType; @Nullable private final DictIdCompressionType _dictIdCompressionType; - @JsonCreator - public ForwardIndexConfig(@JsonProperty("disabled") @Nullable Boolean disabled, - @JsonProperty("chunkCompressionType") @Nullable ChunkCompressionType chunkCompressionType, - @JsonProperty("deriveNumDocsPerChunk") Boolean deriveNumDocsPerChunk, - @JsonProperty("rawIndexWriterVersion") Integer rawIndexWriterVersion, - @JsonProperty("dictIdCompressionType") @Nullable DictIdCompressionType dictIdCompressionType) { + public ForwardIndexConfig(@Nullable Boolean disabled, @Nullable CompressionCodec compressionCodec, + @Nullable Boolean deriveNumDocsPerChunk, @Nullable Integer rawIndexWriterVersion) { super(disabled); - _chunkCompressionType = chunkCompressionType; - _deriveNumDocsPerChunk = deriveNumDocsPerChunk != null && deriveNumDocsPerChunk; + _deriveNumDocsPerChunk = Boolean.TRUE.equals(deriveNumDocsPerChunk); _rawIndexWriterVersion = rawIndexWriterVersion == null ? DEFAULT_RAW_WRITER_VERSION : rawIndexWriterVersion; - _dictIdCompressionType = dictIdCompressionType; + _compressionCodec = compressionCodec; + + if (compressionCodec != null) { + switch (compressionCodec) { + case PASS_THROUGH: + _chunkCompressionType = ChunkCompressionType.PASS_THROUGH; + _dictIdCompressionType = null; + break; + case SNAPPY: + _chunkCompressionType = ChunkCompressionType.SNAPPY; + _dictIdCompressionType = null; + break; + case ZSTANDARD: + _chunkCompressionType = ChunkCompressionType.ZSTANDARD; + _dictIdCompressionType = null; + break; + case LZ4: + _chunkCompressionType = ChunkCompressionType.LZ4; + _dictIdCompressionType = null; + break; + case MV_ENTRY_DICT: + _dictIdCompressionType = DictIdCompressionType.MV_ENTRY_DICT; + _chunkCompressionType = null; + break; + default: + throw new IllegalStateException("Unsupported compression codec: " + compressionCodec); + } + } else { + _dictIdCompressionType = null; + _chunkCompressionType = null; + } + } + + @JsonCreator + public ForwardIndexConfig(@JsonProperty("disabled") @Nullable Boolean disabled, + @JsonProperty("compressionCodec") @Nullable CompressionCodec compressionCodec, + @Deprecated @JsonProperty("chunkCompressionType") @Nullable ChunkCompressionType chunkCompressionType, + @Deprecated @JsonProperty("dictIdCompressionType") @Nullable DictIdCompressionType dictIdCompressionType, + @JsonProperty("deriveNumDocsPerChunk") @Nullable Boolean deriveNumDocsPerChunk, + @JsonProperty("rawIndexWriterVersion") @Nullable Integer rawIndexWriterVersion) { + this(disabled, getActualCompressionCodec(compressionCodec, chunkCompressionType, dictIdCompressionType), + deriveNumDocsPerChunk, rawIndexWriterVersion); + } + + public static CompressionCodec getActualCompressionCodec(@Nullable CompressionCodec compressionCodec, + @Nullable ChunkCompressionType chunkCompressionType, @Nullable DictIdCompressionType dictIdCompressionType) { + if (compressionCodec != null) { + return compressionCodec; + } + if (chunkCompressionType != null && dictIdCompressionType != null) { + throw new IllegalArgumentException("chunkCompressionType and dictIdCompressionType should not be used together"); + } + if (chunkCompressionType != null) { + switch (chunkCompressionType) { + case PASS_THROUGH: + return CompressionCodec.PASS_THROUGH; + case SNAPPY: + return CompressionCodec.SNAPPY; + case ZSTANDARD: + return CompressionCodec.ZSTANDARD; + case LZ4: + return CompressionCodec.LZ4; + default: + throw new IllegalStateException("Unsupported chunk compression type: " + chunkCompressionType); + } + } else if (dictIdCompressionType != null) { + switch (dictIdCompressionType) { + case MV_ENTRY_DICT: + return CompressionCodec.MV_ENTRY_DICT; + default: + throw new IllegalStateException("Unsupported dictionary compression type: " + dictIdCompressionType); + } + } else { + return null; + } } @Nullable - public ChunkCompressionType getChunkCompressionType() { - return _chunkCompressionType; + public CompressionCodec getCompressionCodec() { + return _compressionCodec; } public boolean isDeriveNumDocsPerChunk() { @@ -70,6 +143,13 @@ public class ForwardIndexConfig extends IndexConfig { return _rawIndexWriterVersion; } + @JsonIgnore + @Nullable + public ChunkCompressionType getChunkCompressionType() { + return _chunkCompressionType; + } + + @JsonIgnore @Nullable public DictIdCompressionType getDictIdCompressionType() { return _dictIdCompressionType; @@ -87,38 +167,32 @@ public class ForwardIndexConfig extends IndexConfig { return false; } ForwardIndexConfig that = (ForwardIndexConfig) o; - return _deriveNumDocsPerChunk == that._deriveNumDocsPerChunk - && _rawIndexWriterVersion == that._rawIndexWriterVersion && _chunkCompressionType == that._chunkCompressionType - && Objects.equals(_dictIdCompressionType, that._dictIdCompressionType); + return _compressionCodec == that._compressionCodec && _deriveNumDocsPerChunk == that._deriveNumDocsPerChunk + && _rawIndexWriterVersion == that._rawIndexWriterVersion; } @Override public int hashCode() { - return Objects.hash(super.hashCode(), _chunkCompressionType, _deriveNumDocsPerChunk, _rawIndexWriterVersion, - _dictIdCompressionType); + return Objects.hash(super.hashCode(), _compressionCodec, _deriveNumDocsPerChunk, _rawIndexWriterVersion); } public static class Builder { @Nullable - private ChunkCompressionType _chunkCompressionType; + private CompressionCodec _compressionCodec; private boolean _deriveNumDocsPerChunk = false; private int _rawIndexWriterVersion = DEFAULT_RAW_WRITER_VERSION; - @Nullable - private DictIdCompressionType _dictIdCompressionType; - public Builder() { } public Builder(ForwardIndexConfig other) { - _chunkCompressionType = other.getChunkCompressionType(); + _compressionCodec = other._compressionCodec; _deriveNumDocsPerChunk = other._deriveNumDocsPerChunk; _rawIndexWriterVersion = other._rawIndexWriterVersion; - _dictIdCompressionType = other._dictIdCompressionType; } - public Builder withCompressionType(ChunkCompressionType chunkCompressionType) { - _chunkCompressionType = chunkCompressionType; + public Builder withCompressionCodec(CompressionCodec compressionCodec) { + _compressionCodec = compressionCodec; return this; } @@ -132,36 +206,39 @@ public class ForwardIndexConfig extends IndexConfig { return this; } - public Builder withDictIdCompressionType(DictIdCompressionType dictIdCompressionType) { - _dictIdCompressionType = dictIdCompressionType; - return this; - } - - public Builder withCompressionCodec(CompressionCodec compressionCodec) { - if (compressionCodec == null) { - _chunkCompressionType = null; - _dictIdCompressionType = null; + @Deprecated + public Builder withCompressionType(ChunkCompressionType chunkCompressionType) { + if (chunkCompressionType == null) { return this; } - switch (compressionCodec) { + switch (chunkCompressionType) { + case LZ4: + case LZ4_LENGTH_PREFIXED: + _compressionCodec = CompressionCodec.LZ4; + break; case PASS_THROUGH: - _chunkCompressionType = ChunkCompressionType.PASS_THROUGH; + _compressionCodec = CompressionCodec.PASS_THROUGH; break; case SNAPPY: - _chunkCompressionType = ChunkCompressionType.SNAPPY; + _compressionCodec = CompressionCodec.SNAPPY; break; case ZSTANDARD: - _chunkCompressionType = ChunkCompressionType.ZSTANDARD; - break; - case LZ4: - _chunkCompressionType = ChunkCompressionType.LZ4; - break; - case MV_ENTRY_DICT: - _dictIdCompressionType = DictIdCompressionType.MV_ENTRY_DICT; + _compressionCodec = CompressionCodec.ZSTANDARD; break; default: - throw new IllegalStateException("Unsupported compression codec: " + compressionCodec); + throw new IllegalArgumentException("Unsupported chunk compression type: " + chunkCompressionType); + } + return this; + } + + @Deprecated + public Builder withDictIdCompressionType(DictIdCompressionType dictIdCompressionType) { + if (dictIdCompressionType == null) { + return this; } + Preconditions.checkArgument(dictIdCompressionType == DictIdCompressionType.MV_ENTRY_DICT, + "Unsupported dictionary compression type: " + dictIdCompressionType); + _compressionCodec = CompressionCodec.MV_ENTRY_DICT; return this; } @@ -188,8 +265,7 @@ public class ForwardIndexConfig extends IndexConfig { } public ForwardIndexConfig build() { - return new ForwardIndexConfig(false, _chunkCompressionType, _deriveNumDocsPerChunk, _rawIndexWriterVersion, - _dictIdCompressionType); + return new ForwardIndexConfig(false, _compressionCodec, _deriveNumDocsPerChunk, _rawIndexWriterVersion); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org