saurabhd336 commented on code in PR #12504:
URL: https://github.com/apache/pinot/pull/12504#discussion_r1533482817


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV1.java:
##########
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.fwd;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.StandardOpenOption;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
+import 
org.apache.pinot.segment.local.io.writer.impl.FixedBitMVForwardIndexWriter;
+import 
org.apache.pinot.segment.local.io.writer.impl.FixedBitSVForwardIndexWriter;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.stats.CLPStatsProvider;
+import 
org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.ColumnStatistics;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * Writer for CLP forward index.
+ * <p>CLP forward index contains 3 parts:
+ * <ul>
+ *   <li>Header bytes: MAGIC_BYTES, version, </li>
+ *   <li>LogType dictionary: dictionary for logType column</li>
+ *   <li>DictVars dictionary: dictionary for dictVars column</li>
+ *   <li>LogType fwd index: fwd index for logType column</li>
+ *   <li>DictVars fwd index: fwd index for dictVars column</li>
+ *   <li>EncodedVars fwd index: raw fwd index for encodedVars column</li>
+ * </ul>
+ */
+
+public class CLPForwardIndexCreatorV1 implements ForwardIndexCreator {
+  public static final byte[] MAGIC_BYTES = 
"CLP.v1".getBytes(StandardCharsets.UTF_8);
+  private final String _column;
+  private final int _numDocs;
+  private final File _intermediateFilesDir;
+  private final FileChannel _dataFile;
+  private final ByteBuffer _fileBuffer;
+  private final EncodedMessage _clpEncodedMessage;
+  private final MessageEncoder _clpMessageEncoder;
+  private final StringColumnPreIndexStatsCollector.CLPStats _clpStats;
+  private final SegmentDictionaryCreator _logTypeDictCreator;
+  private final SegmentDictionaryCreator _dictVarsDictCreator;
+  private final FixedBitSVForwardIndexWriter _logTypeFwdIndexWriter;
+  private final FixedBitMVForwardIndexWriter _dictVarsFwdIndexWriter;
+  private final MultiValueFixedByteRawIndexCreator _encodedVarsFwdIndexWriter;
+  private final File _logTypeDictFile;
+  private final File _dictVarsDictFile;
+  private final File _logTypeFwdIndexFile;
+  private final File _dictVarsFwdIndexFile;
+  private final File _encodedVarsFwdIndexFile;
+
+  public CLPForwardIndexCreatorV1(File baseIndexDir, String column, int 
numDocs, ColumnStatistics columnStatistics)
+      throws IOException {
+    _column = column;
+    _numDocs = numDocs;
+    _intermediateFilesDir =
+        new File(baseIndexDir, column + 
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION + ".clp.tmp");
+    if (_intermediateFilesDir.exists()) {
+      FileUtils.cleanDirectory(_intermediateFilesDir);
+    } else {
+      FileUtils.forceMkdir(_intermediateFilesDir);
+    }
+
+    _dataFile =
+        new RandomAccessFile(new File(baseIndexDir, column + 
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION),
+            "rw").getChannel();
+    _fileBuffer = _dataFile.map(FileChannel.MapMode.READ_WRITE, 0, 
Integer.MAX_VALUE);
+
+    CLPStatsProvider statsCollector = (CLPStatsProvider) columnStatistics;
+    _clpStats = statsCollector.getCLPStats();
+    _logTypeDictFile = new File(_intermediateFilesDir, _column + 
"_clp_logtype.dict");
+    _logTypeDictCreator =
+        new SegmentDictionaryCreator(_column + "_clp_logtype.dict", 
FieldSpec.DataType.STRING, _logTypeDictFile, true);
+    _logTypeDictCreator.build(_clpStats.getSortedLogTypeValues());
+
+    _dictVarsDictFile = new File(_intermediateFilesDir, _column + 
"_clp_dictvars.dict");
+    _dictVarsDictCreator =
+        new SegmentDictionaryCreator(_column + "_clp_dictvars.dict", 
FieldSpec.DataType.STRING, _dictVarsDictFile,
+            true);
+    _dictVarsDictCreator.build(_clpStats.getSortedDictVarValues());
+
+    _logTypeFwdIndexFile = new File(_intermediateFilesDir, column + 
"_clp_logtype.fwd");
+    _logTypeFwdIndexWriter = new 
FixedBitSVForwardIndexWriter(_logTypeFwdIndexFile, numDocs,
+        
PinotDataBitSet.getNumBitsPerValue(_clpStats.getSortedLogTypeValues().length - 
1));
+
+    _dictVarsFwdIndexFile = new File(_intermediateFilesDir, column + 
"_clp_dictvars.fwd");
+    _dictVarsFwdIndexWriter =
+        new FixedBitMVForwardIndexWriter(_dictVarsFwdIndexFile, numDocs, 
_clpStats.getTotalNumberOfDictVars(),
+            
PinotDataBitSet.getNumBitsPerValue(_clpStats.getSortedDictVarValues().length - 
1));
+
+    _encodedVarsFwdIndexFile = new File(_intermediateFilesDir, column + 
"_clp_encodedvars.fwd");
+    _encodedVarsFwdIndexWriter =
+        new MultiValueFixedByteRawIndexCreator(_encodedVarsFwdIndexFile, 
ChunkCompressionType.LZ4, numDocs,
+            FieldSpec.DataType.LONG, _clpStats.getMaxNumberOfEncodedVars(), 
false,
+            VarByteChunkForwardIndexWriterV4.VERSION);
+    _clpStats.clear();
+
+    _clpEncodedMessage = new EncodedMessage();
+    _clpMessageEncoder = new 
MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+        BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+  }
+
+  @Override
+  public boolean isDictionaryEncoded() {
+    return false;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return true;
+  }
+
+  @Override
+  public FieldSpec.DataType getValueType() {
+    return FieldSpec.DataType.STRING;
+  }
+
+  @Override
+  public void putBigDecimal(BigDecimal value) {
+    throw new UnsupportedOperationException("Non string types are not 
supported");
+  }
+
+  @Override
+  public void putString(String value) {
+    String logtype;
+    String[] dictVars;
+    Long[] encodedVars;
+
+    try {
+      _clpMessageEncoder.encodeMessage(value, _clpEncodedMessage);
+      logtype = _clpEncodedMessage.getLogTypeAsString();
+      dictVars = _clpEncodedMessage.getDictionaryVarsAsStrings();
+      encodedVars = _clpEncodedMessage.getEncodedVarsAsBoxedLongs();
+    } catch (IOException e) {
+      throw new IllegalArgumentException("Failed to encode message: " + value, 
e);
+    }
+
+    if (logtype == null) {
+      logtype = FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING;
+    }
+
+    if (dictVars == null) {
+      dictVars = new 
String[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING};
+    }
+
+    if (encodedVars == null) {
+      encodedVars = new Long[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_LONG};
+    }
+
+    addCLPFields(logtype, dictVars, encodedVars);
+  }
+
+  private void addCLPFields(String logtype, String[] dictVars, Long[] 
encodedVars) {
+    int logTypeDictId = _logTypeDictCreator.indexOfSV(logtype);
+    int[] dictVarDictIds = _dictVarsDictCreator.indexOfMV(dictVars);
+
+    _logTypeFwdIndexWriter.putDictId(logTypeDictId);
+    _dictVarsFwdIndexWriter.putDictIds(dictVarDictIds);
+
+    long[] encodedVarsUnboxed = new long[encodedVars.length];
+    for (int i = 0; i < encodedVars.length; i++) {
+      encodedVarsUnboxed[i] = encodedVars[i];
+    }
+    _encodedVarsFwdIndexWriter.putLongMV(encodedVarsUnboxed);
+  }
+
+  @Override
+  public void seal()
+      throws IOException {
+    // Append all of these into fileBuffer
+    _logTypeDictCreator.seal();
+    _logTypeDictCreator.close();
+
+    _dictVarsDictCreator.seal();
+    _dictVarsDictCreator.close();
+
+    _logTypeFwdIndexWriter.close();
+    _dictVarsFwdIndexWriter.close();
+    _encodedVarsFwdIndexWriter.close();
+
+    long totalSize = 0;
+    _fileBuffer.put(MAGIC_BYTES);
+    totalSize += MAGIC_BYTES.length;
+
+    _fileBuffer.putInt(1); // version
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt(_clpStats.getTotalNumberOfDictVars());
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt(_logTypeDictCreator.getNumBytesPerEntry());
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt(_dictVarsDictCreator.getNumBytesPerEntry());
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt((int) _logTypeDictFile.length()); // logType dict length
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt((int) _dictVarsDictFile.length()); // dictVars dict 
length
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt((int) _logTypeFwdIndexFile.length()); // logType fwd 
index length
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt((int) _dictVarsFwdIndexFile.length()); // dictVars fwd 
index length
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt((int) _encodedVarsFwdIndexFile.length()); // 
encodedVars fwd index length
+    totalSize += Integer.BYTES;
+
+    copyFileIntoBuffer(_logTypeDictFile);
+    totalSize += _logTypeDictFile.length();
+
+    copyFileIntoBuffer(_dictVarsDictFile);
+    totalSize += _dictVarsDictFile.length();
+
+    copyFileIntoBuffer(_logTypeFwdIndexFile);
+    totalSize += _logTypeFwdIndexFile.length();
+
+    copyFileIntoBuffer(_dictVarsFwdIndexFile);
+    totalSize += _dictVarsFwdIndexFile.length();
+
+    copyFileIntoBuffer(_encodedVarsFwdIndexFile);
+    totalSize += _encodedVarsFwdIndexFile.length();
+
+    _dataFile.truncate(totalSize);
+  }
+
+  private void copyFileIntoBuffer(File file) throws IOException {
+    try (FileChannel from = (FileChannel.open(file.toPath(), 
StandardOpenOption.READ))) {
+      _fileBuffer.put(from.map(FileChannel.MapMode.READ_ONLY, 0, 
file.length()));
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    // Delete all temp file

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to